diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/flume/tests/ready.rs | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/flume/tests/ready.rs')
-rw-r--r-- | vendor/flume/tests/ready.rs | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/vendor/flume/tests/ready.rs b/vendor/flume/tests/ready.rs new file mode 100644 index 0000000..2f42d9a --- /dev/null +++ b/vendor/flume/tests/ready.rs @@ -0,0 +1,837 @@ +// //! Tests for channel readiness using the `Select` struct. + +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; + +// use std::any::Any; +// use std::cell::Cell; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, bounded, tick, unbounded}; +// use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn smoke1() { +// let (s1, r1) = unbounded::<usize>(); +// let (s2, r2) = unbounded::<usize>(); + +// s1.send(1).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert_eq!(sel.ready(), 0); +// assert_eq!(r1.try_recv(), Ok(1)); + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert_eq!(sel.ready(), 1); +// assert_eq!(r2.try_recv(), Ok(2)); +// } + +// #[test] +// fn smoke2() { +// let (_s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (_s3, r3) = unbounded::<i32>(); +// let (_s4, r4) = unbounded::<i32>(); +// let (s5, r5) = unbounded::<i32>(); + +// s5.send(5).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&r3); +// sel.recv(&r4); +// sel.recv(&r5); +// assert_eq!(sel.ready(), 4); +// assert_eq!(r5.try_recv(), Ok(5)); +// } + +// #[test] +// fn disconnected() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// drop(s1); +// thread::sleep(ms(500)); +// s2.send(5).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// r2.recv().unwrap(); +// }) +// .unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// drop(s2); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert!(sel.try_ready().is_err()); + +// drop(s1); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.try_ready() { +// Ok(0) => assert!(r1.try_recv().is_err()), +// _ => panic!(), +// } + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r2); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } + +// let mut sel = Select::new(); +// sel.recv(&r2); +// assert!(sel.try_ready().is_err()); + +// let mut sel = Select::new(); +// assert!(sel.try_ready().is_err()); +// } + +// #[test] +// fn timeout() { +// let (_s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(1500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert!(sel.ready_timeout(ms(1000)).is_err()); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// let (s, r) = unbounded::<i32>(); + +// scope.spawn(move |_| { +// thread::sleep(ms(500)); +// drop(s); +// }); + +// let mut sel = Select::new(); +// assert!(sel.ready_timeout(ms(1000)).is_err()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default_when_disconnected() { +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.try_ready() { +// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), +// _ => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), +// _ => panic!(), +// } +// } + +// #[test] +// fn default_only() { +// let start = Instant::now(); + +// let mut sel = Select::new(); +// assert!(sel.try_ready().is_err()); +// let now = Instant::now(); +// assert!(now - start <= ms(50)); + +// let start = Instant::now(); +// let mut sel = Select::new(); +// assert!(sel.ready_timeout(ms(500)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(450)); +// assert!(now - start <= ms(550)); +// } + +// #[test] +// fn unblocks() { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// assert_eq!(r1.recv().unwrap(), 1); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s1); +// let oper2 = sel.send(&s2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => oper.send(&s1, 1).unwrap(), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn both_ready() { +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s1.send(1).unwrap(); +// assert_eq!(r2.recv().unwrap(), 2); +// }); + +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(1)), +// 1 => s2.try_send(2).unwrap(), +// _ => panic!(), +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning1() { +// scope(|scope| { +// let (s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (s3, r3) = unbounded::<()>(); + +// scope.spawn(move |_| { +// r3.recv().unwrap(); +// drop(s1.clone()); +// assert!(r3.try_recv().is_err()); +// s1.send(1).unwrap(); +// r3.recv().unwrap(); +// }); + +// s3.send(()).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => drop(r1.try_recv()), +// 1 => drop(r2.try_recv()), +// _ => panic!(), +// } + +// s3.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning2() { +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = unbounded::<()>(); +// let (_s3, _r3) = unbounded::<()>(); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => panic!(), +// 1 => drop(r2.try_recv()), +// _ => panic!(), +// } +// }); + +// thread::sleep(ms(500)); +// drop(s1.clone()); +// s2.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn preflight1() { +// let (s, r) = unbounded(); +// s.send(()).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => drop(r.try_recv()), +// _ => panic!(), +// } +// } + +// #[test] +// fn preflight2() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => assert_eq!(r.try_recv(), Ok(())), +// _ => panic!(), +// } + +// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +// } + +// #[test] +// fn preflight3() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); +// r.recv().unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// } + +// #[test] +// fn duplicate_operations() { +// let (s, r) = unbounded::<i32>(); +// let hit = vec![Cell::new(false); 4]; + +// while hit.iter().map(|h| h.get()).any(|hit| !hit) { +// let mut sel = Select::new(); +// sel.recv(&r); +// sel.recv(&r); +// sel.send(&s); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(r.try_recv().is_ok()); +// hit[0].set(true); +// } +// 1 => { +// assert!(r.try_recv().is_ok()); +// hit[1].set(true); +// } +// 2 => { +// assert!(s.try_send(0).is_ok()); +// hit[2].set(true); +// } +// 3 => { +// assert!(s.try_send(0).is_ok()); +// hit[3].set(true); +// } +// _ => panic!(), +// } +// } +// } + +// #[test] +// fn nesting() { +// let (s, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(s.try_send(0).is_ok()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => { +// assert_eq!(r.try_recv(), Ok(0)); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(s.try_send(1).is_ok()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => { +// assert_eq!(r.try_recv(), Ok(1)); +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } + +// #[test] +// fn stress_recv() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded(); +// let (s2, r2) = bounded(5); +// let (s3, r3) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// r3.recv().unwrap(); + +// s2.send(i).unwrap(); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(i)), +// 1 => assert_eq!(r2.try_recv(), Ok(i)), +// _ => panic!(), +// } + +// s3.send(()).unwrap(); +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_send() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// assert_eq!(r1.recv().unwrap(), i); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.send(&s1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert!(s1.try_send(i).is_ok()), +// 1 => assert!(s2.try_send(i).is_ok()), +// _ => panic!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_mixed() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(i)), +// 1 => assert!(s2.try_send(i).is_ok()), +// _ => panic!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_timeout_two_threads() { +// const COUNT: usize = 20; + +// let (s, r) = bounded(2); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let done = false; +// while !done { +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready_timeout(ms(100)) { +// Err(_) => {} +// Ok(0) => { +// assert!(s.try_send(i).is_ok()); +// break; +// } +// Ok(_) => panic!(), +// } +// } +// } +// }); + +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let mut done = false; +// while !done { +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready_timeout(ms(100)) { +// Err(_) => {} +// Ok(0) => { +// assert_eq!(r.try_recv(), Ok(i)); +// done = true; +// } +// Ok(_) => panic!(), +// } +// } +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send_recv_same_channel() { +// let (s, r) = bounded::<i32>(0); +// let mut sel = Select::new(); +// sel.send(&s); +// sel.recv(&r); +// assert!(sel.ready_timeout(ms(100)).is_err()); + +// let (s, r) = unbounded::<i32>(); +// let mut sel = Select::new(); +// sel.send(&s); +// sel.recv(&r); +// match sel.ready_timeout(ms(100)) { +// Err(_) => panic!(), +// Ok(0) => assert!(s.try_send(0).is_ok()), +// Ok(_) => panic!(), +// } +// } + +// #[test] +// fn channel_through_channel() { +// const COUNT: usize = 1000; + +// type T = Box<dyn Any + Send>; + +// for cap in 1..4 { +// let (s, r) = bounded::<T>(cap); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut s = s; + +// for _ in 0..COUNT { +// let (new_s, new_r) = bounded(cap); +// let new_r: T = Box::new(Some(new_r)); + +// { +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => assert!(s.try_send(new_r).is_ok()), +// _ => panic!(), +// } +// } + +// s = new_s; +// } +// }); + +// scope.spawn(move |_| { +// let mut r = r; + +// for _ in 0..COUNT { +// let new = { +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => r +// .try_recv() +// .unwrap() +// .downcast_mut::<Option<Receiver<T>>>() +// .unwrap() +// .take() +// .unwrap(), +// _ => panic!(), +// } +// }; +// r = new; +// } +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn fairness1() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let hits = vec![Cell::new(0usize); 4]; +// for _ in 0..COUNT { +// let after = after(ms(0)); +// let tick = tick(ms(0)); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&after); +// sel.recv(&tick); +// match sel.ready() { +// 0 => { +// r1.try_recv().unwrap(); +// hits[0].set(hits[0].get() + 1); +// } +// 1 => { +// r2.try_recv().unwrap(); +// hits[1].set(hits[1].get() + 1); +// } +// 2 => { +// after.try_recv().unwrap(); +// hits[2].set(hits[2].get() + 1); +// } +// 3 => { +// tick.try_recv().unwrap(); +// hits[3].set(hits[3].get() + 1); +// } +// _ => panic!(), +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness2() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = bounded::<()>(1); +// let (s3, r3) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// let mut oper1 = None; +// let mut oper2 = None; +// if s1.is_empty() { +// oper1 = Some(sel.send(&s1)); +// } +// if s2.is_empty() { +// oper2 = Some(sel.send(&s2)); +// } +// let oper3 = sel.send(&s3); +// let oper = sel.select(); +// match oper.index() { +// i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), +// i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), +// i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), +// _ => unreachable!(), +// } +// } +// }); + +// let hits = vec![Cell::new(0usize); 3]; +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&r3); +// loop { +// match sel.ready() { +// 0 => { +// if r1.try_recv().is_ok() { +// hits[0].set(hits[0].get() + 1); +// break; +// } +// } +// 1 => { +// if r2.try_recv().is_ok() { +// hits[1].set(hits[1].get() + 1); +// break; +// } +// } +// 2 => { +// if r3.try_recv().is_ok() { +// hits[2].set(hits[2].get() + 1); +// break; +// } +// } +// _ => unreachable!(), +// } +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 10)); +// }) +// .unwrap(); +// } |