aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/broadcast/test.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/rayon-core/src/broadcast/test.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/rayon-core/src/broadcast/test.rs')
-rw-r--r--vendor/rayon-core/src/broadcast/test.rs263
1 files changed, 263 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs
new file mode 100644
index 0000000..00ab4ad
--- /dev/null
+++ b/vendor/rayon-core/src/broadcast/test.rs
@@ -0,0 +1,263 @@
+#![cfg(test)]
+
+use crate::ThreadPoolBuilder;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::channel;
+use std::sync::Arc;
+use std::{thread, time};
+
+#[test]
+fn broadcast_global() {
+ let v = crate::broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_global() {
+ let (tx, rx) = channel();
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_pool() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_pool() {
+ let (tx, rx) = channel();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_self() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_self() {
+ let (tx, rx) = channel();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ pool2.broadcast(|_| {
+ pool1.broadcast(|_| {
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual() {
+ let (tx, rx) = channel();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual_sleepy() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.broadcast(|_| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.broadcast(|_| {
+ thread::sleep(time::Duration::from_millis(100));
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual_sleepy() {
+ let (tx, rx) = channel();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.spawn_broadcast(move |_| {
+ thread::sleep(time::Duration::from_millis(100));
+ tx.send(()).unwrap();
+ })
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_one() {
+ let (tx, rx) = channel();
+ let (panic_tx, panic_rx) = channel();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 1);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_many() {
+ let (tx, rx) = channel();
+ let (panic_tx, panic_rx) = channel();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 4);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_sleep_race() {
+ let test_duration = time::Duration::from_secs(1);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let start = time::Instant::now();
+ while start.elapsed() < test_duration {
+ pool.broadcast(|ctx| {
+ // A slight spread of sleep duration increases the chance that one
+ // of the threads will race in the pool's idle sleep afterward.
+ thread::sleep(time::Duration::from_micros(ctx.index() as u64));
+ });
+ }
+}
+
+#[test]
+fn broadcast_after_spawn_broadcast() {
+ let (tx, rx) = channel();
+
+ // Queue a non-blocking spawn_broadcast.
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ // This blocking broadcast runs after all prior broadcasts.
+ crate::broadcast(|_| {});
+
+ // The spawn_broadcast **must** have run by now on all threads.
+ let mut v: Vec<_> = rx.try_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_after_spawn() {
+ let (tx, rx) = channel();
+
+ // Queue a regular spawn on a thread-local deque.
+ crate::registry::in_worker(move |_, _| {
+ crate::spawn(move || tx.send(22).unwrap());
+ });
+
+ // Broadcast runs after the local deque is empty.
+ crate::broadcast(|_| {});
+
+ // The spawn **must** have run by now.
+ assert_eq!(22, rx.try_recv().unwrap());
+}