aboutsummaryrefslogtreecommitdiff
path: root/vendor/crossbeam-deque/tests/injector.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/crossbeam-deque/tests/injector.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/crossbeam-deque/tests/injector.rs')
-rw-r--r--vendor/crossbeam-deque/tests/injector.rs375
1 files changed, 375 insertions, 0 deletions
diff --git a/vendor/crossbeam-deque/tests/injector.rs b/vendor/crossbeam-deque/tests/injector.rs
new file mode 100644
index 0000000..f706a8d
--- /dev/null
+++ b/vendor/crossbeam-deque/tests/injector.rs
@@ -0,0 +1,375 @@
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::{AtomicBool, AtomicUsize};
+use std::sync::{Arc, Mutex};
+
+use crossbeam_deque::Steal::{Empty, Success};
+use crossbeam_deque::{Injector, Worker};
+use crossbeam_utils::thread::scope;
+use rand::Rng;
+
+#[test]
+fn smoke() {
+ let q = Injector::new();
+ assert_eq!(q.steal(), Empty);
+
+ q.push(1);
+ q.push(2);
+ assert_eq!(q.steal(), Success(1));
+ assert_eq!(q.steal(), Success(2));
+ assert_eq!(q.steal(), Empty);
+
+ q.push(3);
+ assert_eq!(q.steal(), Success(3));
+ assert_eq!(q.steal(), Empty);
+}
+
+#[test]
+fn is_empty() {
+ let q = Injector::new();
+ assert!(q.is_empty());
+
+ q.push(1);
+ assert!(!q.is_empty());
+ q.push(2);
+ assert!(!q.is_empty());
+
+ let _ = q.steal();
+ assert!(!q.is_empty());
+ let _ = q.steal();
+ assert!(q.is_empty());
+
+ q.push(3);
+ assert!(!q.is_empty());
+ let _ = q.steal();
+ assert!(q.is_empty());
+}
+
+#[test]
+fn spsc() {
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 100_000;
+
+ let q = Injector::new();
+
+ scope(|scope| {
+ scope.spawn(|_| {
+ for i in 0..COUNT {
+ loop {
+ if let Success(v) = q.steal() {
+ assert_eq!(i, v);
+ break;
+ }
+ #[cfg(miri)]
+ std::hint::spin_loop();
+ }
+ }
+
+ assert_eq!(q.steal(), Empty);
+ });
+
+ for i in 0..COUNT {
+ q.push(i);
+ }
+ })
+ .unwrap();
+}
+
+#[test]
+fn mpmc() {
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 25_000;
+ const THREADS: usize = 4;
+
+ let q = Injector::new();
+ let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ scope.spawn(|_| {
+ for i in 0..COUNT {
+ q.push(i);
+ }
+ });
+ }
+
+ for _ in 0..THREADS {
+ scope.spawn(|_| {
+ for _ in 0..COUNT {
+ loop {
+ if let Success(n) = q.steal() {
+ v[n].fetch_add(1, SeqCst);
+ break;
+ }
+ #[cfg(miri)]
+ std::hint::spin_loop();
+ }
+ }
+ });
+ }
+ })
+ .unwrap();
+
+ for c in v {
+ assert_eq!(c.load(SeqCst), THREADS);
+ }
+}
+
+#[test]
+fn stampede() {
+ const THREADS: usize = 8;
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 50_000;
+
+ let q = Injector::new();
+
+ for i in 0..COUNT {
+ q.push(Box::new(i + 1));
+ }
+ let remaining = Arc::new(AtomicUsize::new(COUNT));
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ let remaining = remaining.clone();
+ let q = &q;
+
+ scope.spawn(move |_| {
+ let mut last = 0;
+ while remaining.load(SeqCst) > 0 {
+ if let Success(x) = q.steal() {
+ assert!(last < *x);
+ last = *x;
+ remaining.fetch_sub(1, SeqCst);
+ }
+ }
+ });
+ }
+
+ let mut last = 0;
+ while remaining.load(SeqCst) > 0 {
+ if let Success(x) = q.steal() {
+ assert!(last < *x);
+ last = *x;
+ remaining.fetch_sub(1, SeqCst);
+ }
+ }
+ })
+ .unwrap();
+}
+
+#[test]
+fn stress() {
+ const THREADS: usize = 8;
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 50_000;
+
+ let q = Injector::new();
+ let done = Arc::new(AtomicBool::new(false));
+ let hits = Arc::new(AtomicUsize::new(0));
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ let done = done.clone();
+ let hits = hits.clone();
+ let q = &q;
+
+ scope.spawn(move |_| {
+ let w2 = Worker::new_fifo();
+
+ while !done.load(SeqCst) {
+ if let Success(_) = q.steal() {
+ hits.fetch_add(1, SeqCst);
+ }
+
+ let _ = q.steal_batch(&w2);
+
+ if let Success(_) = q.steal_batch_and_pop(&w2) {
+ hits.fetch_add(1, SeqCst);
+ }
+
+ while w2.pop().is_some() {
+ hits.fetch_add(1, SeqCst);
+ }
+ }
+ });
+ }
+
+ let mut rng = rand::thread_rng();
+ let mut expected = 0;
+ while expected < COUNT {
+ if rng.gen_range(0..3) == 0 {
+ while let Success(_) = q.steal() {
+ hits.fetch_add(1, SeqCst);
+ }
+ } else {
+ q.push(expected);
+ expected += 1;
+ }
+ }
+
+ while hits.load(SeqCst) < COUNT {
+ while let Success(_) = q.steal() {
+ hits.fetch_add(1, SeqCst);
+ }
+ }
+ done.store(true, SeqCst);
+ })
+ .unwrap();
+}
+
+#[cfg_attr(miri, ignore)] // Miri is too slow
+#[test]
+fn no_starvation() {
+ const THREADS: usize = 8;
+ const COUNT: usize = 50_000;
+
+ let q = Injector::new();
+ let done = Arc::new(AtomicBool::new(false));
+ let mut all_hits = Vec::new();
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ let done = done.clone();
+ let hits = Arc::new(AtomicUsize::new(0));
+ all_hits.push(hits.clone());
+ let q = &q;
+
+ scope.spawn(move |_| {
+ let w2 = Worker::new_fifo();
+
+ while !done.load(SeqCst) {
+ if let Success(_) = q.steal() {
+ hits.fetch_add(1, SeqCst);
+ }
+
+ let _ = q.steal_batch(&w2);
+
+ if let Success(_) = q.steal_batch_and_pop(&w2) {
+ hits.fetch_add(1, SeqCst);
+ }
+
+ while w2.pop().is_some() {
+ hits.fetch_add(1, SeqCst);
+ }
+ }
+ });
+ }
+
+ let mut rng = rand::thread_rng();
+ let mut my_hits = 0;
+ loop {
+ for i in 0..rng.gen_range(0..COUNT) {
+ if rng.gen_range(0..3) == 0 && my_hits == 0 {
+ while let Success(_) = q.steal() {
+ my_hits += 1;
+ }
+ } else {
+ q.push(i);
+ }
+ }
+
+ if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
+ break;
+ }
+ }
+ done.store(true, SeqCst);
+ })
+ .unwrap();
+}
+
+#[test]
+fn destructors() {
+ #[cfg(miri)]
+ const THREADS: usize = 2;
+ #[cfg(not(miri))]
+ const THREADS: usize = 8;
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 50_000;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 1000;
+
+ struct Elem(usize, Arc<Mutex<Vec<usize>>>);
+
+ impl Drop for Elem {
+ fn drop(&mut self) {
+ self.1.lock().unwrap().push(self.0);
+ }
+ }
+
+ let q = Injector::new();
+ let dropped = Arc::new(Mutex::new(Vec::new()));
+ let remaining = Arc::new(AtomicUsize::new(COUNT));
+
+ for i in 0..COUNT {
+ q.push(Elem(i, dropped.clone()));
+ }
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ let remaining = remaining.clone();
+ let q = &q;
+
+ scope.spawn(move |_| {
+ let w2 = Worker::new_fifo();
+ let mut cnt = 0;
+
+ while cnt < STEPS {
+ if let Success(_) = q.steal() {
+ cnt += 1;
+ remaining.fetch_sub(1, SeqCst);
+ }
+
+ let _ = q.steal_batch(&w2);
+
+ if let Success(_) = q.steal_batch_and_pop(&w2) {
+ cnt += 1;
+ remaining.fetch_sub(1, SeqCst);
+ }
+
+ while w2.pop().is_some() {
+ cnt += 1;
+ remaining.fetch_sub(1, SeqCst);
+ }
+ }
+ });
+ }
+
+ for _ in 0..STEPS {
+ if let Success(_) = q.steal() {
+ remaining.fetch_sub(1, SeqCst);
+ }
+ }
+ })
+ .unwrap();
+
+ let rem = remaining.load(SeqCst);
+ assert!(rem > 0);
+
+ {
+ let mut v = dropped.lock().unwrap();
+ assert_eq!(v.len(), COUNT - rem);
+ v.clear();
+ }
+
+ drop(q);
+
+ {
+ let mut v = dropped.lock().unwrap();
+ assert_eq!(v.len(), rem);
+ v.sort_unstable();
+ for pair in v.windows(2) {
+ assert_eq!(pair[0] + 1, pair[1]);
+ }
+ }
+}