diff options
Diffstat (limited to 'vendor/rayon-core/src/join')
-rw-r--r-- | vendor/rayon-core/src/join/mod.rs | 188 | ||||
-rw-r--r-- | vendor/rayon-core/src/join/test.rs | 151 |
2 files changed, 0 insertions, 339 deletions
diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs deleted file mode 100644 index 5ab9f6b..0000000 --- a/vendor/rayon-core/src/join/mod.rs +++ /dev/null @@ -1,188 +0,0 @@ -use crate::job::StackJob; -use crate::latch::SpinLatch; -use crate::registry::{self, WorkerThread}; -use crate::unwind; -use std::any::Any; - -use crate::FnContext; - -#[cfg(test)] -mod test; - -/// Takes two closures and *potentially* runs them in parallel. It -/// returns a pair of the results from those closures. -/// -/// Conceptually, calling `join()` is similar to spawning two threads, -/// one executing each of the two closures. However, the -/// implementation is quite different and incurs very low -/// overhead. The underlying technique is called "work stealing": the -/// Rayon runtime uses a fixed pool of worker threads and attempts to -/// only execute code in parallel when there are idle CPUs to handle -/// it. -/// -/// When `join` is called from outside the thread pool, the calling -/// thread will block while the closures execute in the pool. When -/// `join` is called within the pool, the calling thread still actively -/// participates in the thread pool. It will begin by executing closure -/// A (on the current thread). While it is doing that, it will advertise -/// closure B as being available for other threads to execute. Once closure A -/// has completed, the current thread will try to execute closure B; -/// if however closure B has been stolen, then it will look for other work -/// while waiting for the thief to fully execute closure B. (This is the -/// typical work-stealing strategy). -/// -/// # Examples -/// -/// This example uses join to perform a quick-sort (note this is not a -/// particularly optimized implementation: if you **actually** want to -/// sort for real, you should prefer [the `par_sort` method] offered -/// by Rayon). -/// -/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort -/// -/// ```rust -/// # use rayon_core as rayon; -/// let mut v = vec![5, 1, 8, 22, 0, 44]; -/// quick_sort(&mut v); -/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]); -/// -/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { -/// if v.len() > 1 { -/// let mid = partition(v); -/// let (lo, hi) = v.split_at_mut(mid); -/// rayon::join(|| quick_sort(lo), -/// || quick_sort(hi)); -/// } -/// } -/// -/// // Partition rearranges all items `<=` to the pivot -/// // item (arbitrary selected to be the last item in the slice) -/// // to the first half of the slice. It then returns the -/// // "dividing point" where the pivot is placed. -/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { -/// let pivot = v.len() - 1; -/// let mut i = 0; -/// for j in 0..pivot { -/// if v[j] <= v[pivot] { -/// v.swap(i, j); -/// i += 1; -/// } -/// } -/// v.swap(i, pivot); -/// i -/// } -/// ``` -/// -/// # Warning about blocking I/O -/// -/// The assumption is that the closures given to `join()` are -/// CPU-bound tasks that do not perform I/O or other blocking -/// operations. If you do perform I/O, and that I/O should block -/// (e.g., waiting for a network request), the overall performance may -/// be poor. Moreover, if you cause one closure to be blocked waiting -/// on another (for example, using a channel), that could lead to a -/// deadlock. -/// -/// # Panics -/// -/// No matter what happens, both closures will always be executed. If -/// a single closure panics, whether it be the first or second -/// closure, that panic will be propagated and hence `join()` will -/// panic with the same panic value. If both closures panic, `join()` -/// will panic with the panic value from the first closure. -pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, -{ - #[inline] - fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R { - move |_| f() - } - - join_context(call(oper_a), call(oper_b)) -} - -/// Identical to `join`, except that the closures have a parameter -/// that provides context for the way the closure has been called, -/// especially indicating whether they're executing on a different -/// thread than where `join_context` was called. This will occur if -/// the second job is stolen by a different thread, or if -/// `join_context` was called from outside the thread pool to begin -/// with. -pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce(FnContext) -> RA + Send, - B: FnOnce(FnContext) -> RB + Send, - RA: Send, - RB: Send, -{ - #[inline] - fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { - move || f(FnContext::new(injected)) - } - - #[inline] - fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { - move |migrated| f(FnContext::new(migrated)) - } - - registry::in_worker(|worker_thread, injected| unsafe { - // Create virtual wrapper for task b; this all has to be - // done here so that the stack frame can keep it all live - // long enough. - let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); - let job_b_ref = job_b.as_job_ref(); - let job_b_id = job_b_ref.id(); - worker_thread.push(job_b_ref); - - // Execute task a; hopefully b gets stolen in the meantime. - let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); - let result_a = match status_a { - Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), - }; - - // Now that task A has finished, try to pop job B from the - // local stack. It may already have been popped by job A; it - // may also have been stolen. There may also be some tasks - // pushed on top of it in the stack, and we will have to pop - // those off to get to it. - while !job_b.latch.probe() { - if let Some(job) = worker_thread.take_local_job() { - if job_b_id == job.id() { - // Found it! Let's run it. - // - // Note that this could panic, but it's ok if we unwind here. - let result_b = job_b.run_inline(injected); - return (result_a, result_b); - } else { - worker_thread.execute(job); - } - } else { - // Local deque is empty. Time to steal from other - // threads. - worker_thread.wait_until(&job_b.latch); - debug_assert!(job_b.latch.probe()); - break; - } - } - - (result_a, job_b.into_result()) - }) -} - -/// If job A panics, we still cannot return until we are sure that job -/// B is complete. This is because it may contain references into the -/// enclosing stack frame(s). -#[cold] // cold path -unsafe fn join_recover_from_panic( - worker_thread: &WorkerThread, - job_b_latch: &SpinLatch<'_>, - err: Box<dyn Any + Send>, -) -> ! { - worker_thread.wait_until(job_b_latch); - unwind::resume_unwinding(err) -} diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs deleted file mode 100644 index b303dbc..0000000 --- a/vendor/rayon-core/src/join/test.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! Tests for the join code. - -use crate::join::*; -use crate::unwind; -use crate::ThreadPoolBuilder; -use rand::distributions::Standard; -use rand::{Rng, SeedableRng}; -use rand_xorshift::XorShiftRng; - -fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) { - if v.len() <= 1 { - return; - } - - let mid = partition(v); - let (lo, hi) = v.split_at_mut(mid); - join(|| quick_sort(lo), || quick_sort(hi)); -} - -fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize { - let pivot = v.len() - 1; - let mut i = 0; - for j in 0..pivot { - if v[j] <= v[pivot] { - v.swap(i, j); - i += 1; - } - } - v.swap(i, pivot); - i -} - -fn seeded_rng() -> XorShiftRng { - let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); - (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); - XorShiftRng::from_seed(seed) -} - -#[test] -fn sort() { - let rng = seeded_rng(); - let mut data: Vec<u32> = rng.sample_iter(&Standard).take(6 * 1024).collect(); - let mut sorted_data = data.clone(); - sorted_data.sort(); - quick_sort(&mut data); - assert_eq!(data, sorted_data); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn sort_in_pool() { - let rng = seeded_rng(); - let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect(); - - let pool = ThreadPoolBuilder::new().build().unwrap(); - let mut sorted_data = data.clone(); - sorted_data.sort(); - pool.install(|| quick_sort(&mut data)); - assert_eq!(data, sorted_data); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_a() { - join(|| panic!("Hello, world!"), || ()); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_b() { - join(|| (), || panic!("Hello, world!")); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_both() { - join(|| panic!("Hello, world!"), || panic!("Goodbye, world!")); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_b_still_executes() { - let mut x = false; - match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) { - Ok(_) => panic!("failed to propagate panic from closure A,"), - Err(_) => assert!(x, "closure b failed to execute"), - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_both() { - // If we're not in a pool, both should be marked stolen as they're injected. - let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated()); - assert!(a_migrated); - assert!(b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_neither() { - // If we're already in a 1-thread pool, neither job should be stolen. - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (a_migrated, b_migrated) = - pool.install(|| join_context(|a| a.migrated(), |b| b.migrated())); - assert!(!a_migrated); - assert!(!b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_second() { - use std::sync::Barrier; - - // If we're already in a 2-thread pool, the second job should be stolen. - let barrier = Barrier::new(2); - let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let (a_migrated, b_migrated) = pool.install(|| { - join_context( - |a| { - barrier.wait(); - a.migrated() - }, - |b| { - barrier.wait(); - b.migrated() - }, - ) - }); - assert!(!a_migrated); - assert!(b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_counter_overflow() { - const MAX: u32 = 500_000; - - let mut i = 0; - let mut j = 0; - let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - - // Hammer on join a bunch of times -- used to hit overflow debug-assertions - // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797 - for _ in 0..MAX { - pool.join(|| i += 1, || j += 1); - } - - assert_eq!(i, MAX); - assert_eq!(j, MAX); -} |