diff options
Diffstat (limited to 'vendor/flume/tests')
-rw-r--r-- | vendor/flume/tests/after.rs | 339 | ||||
-rw-r--r-- | vendor/flume/tests/array.rs | 657 | ||||
-rw-r--r-- | vendor/flume/tests/async.rs | 276 | ||||
-rw-r--r-- | vendor/flume/tests/basic.rs | 428 | ||||
-rw-r--r-- | vendor/flume/tests/check_same_channel.rs | 57 | ||||
-rw-r--r-- | vendor/flume/tests/golang.rs | 1445 | ||||
-rw-r--r-- | vendor/flume/tests/iter.rs | 112 | ||||
-rw-r--r-- | vendor/flume/tests/list.rs | 536 | ||||
-rw-r--r-- | vendor/flume/tests/method_sharing.rs | 39 | ||||
-rw-r--r-- | vendor/flume/tests/mpsc.rs | 2095 | ||||
-rw-r--r-- | vendor/flume/tests/never.rs | 99 | ||||
-rw-r--r-- | vendor/flume/tests/ready.rs | 837 | ||||
-rw-r--r-- | vendor/flume/tests/same_channel.rs | 114 | ||||
-rw-r--r-- | vendor/flume/tests/select.rs | 1304 | ||||
-rw-r--r-- | vendor/flume/tests/select_macro.rs | 1440 | ||||
-rw-r--r-- | vendor/flume/tests/stream.rs | 255 | ||||
-rw-r--r-- | vendor/flume/tests/thread_locals.rs | 53 | ||||
-rw-r--r-- | vendor/flume/tests/tick.rs | 353 | ||||
-rw-r--r-- | vendor/flume/tests/zero.rs | 557 |
19 files changed, 0 insertions, 10996 deletions
diff --git a/vendor/flume/tests/after.rs b/vendor/flume/tests/after.rs deleted file mode 100644 index 6d25108..0000000 --- a/vendor/flume/tests/after.rs +++ /dev/null @@ -1,339 +0,0 @@ -// //! Tests for the after channel flavor. - -// #[macro_use] -// extern crate crossbeam_channel; -// extern crate crossbeam_utils; -// extern crate rand; - -// use std::sync::atomic::AtomicUsize; -// use std::sync::atomic::Ordering; -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{after, Select, TryRecvError}; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn fire() { -// let start = Instant::now(); -// let r = after(ms(50)); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// thread::sleep(ms(100)); - -// let fired = r.try_recv().unwrap(); -// assert!(start < fired); -// assert!(fired - start >= ms(50)); - -// let now = Instant::now(); -// assert!(fired < now); -// assert!(now - fired >= ms(50)); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - -// select! { -// recv(r) -> _ => panic!(), -// default => {} -// } - -// select! { -// recv(r) -> _ => panic!(), -// recv(after(ms(200))) -> _ => {} -// } -// } - -// #[test] -// fn capacity() { -// const COUNT: usize = 10; - -// for i in 0..COUNT { -// let r = after(ms(i as u64)); -// assert_eq!(r.capacity(), Some(1)); -// } -// } - -// #[test] -// fn len_empty_full() { -// let r = after(ms(50)); - -// assert_eq!(r.len(), 0); -// assert_eq!(r.is_empty(), true); -// assert_eq!(r.is_full(), false); - -// thread::sleep(ms(100)); - -// assert_eq!(r.len(), 1); -// assert_eq!(r.is_empty(), false); -// assert_eq!(r.is_full(), true); - -// r.try_recv().unwrap(); - -// assert_eq!(r.len(), 0); -// assert_eq!(r.is_empty(), true); -// assert_eq!(r.is_full(), false); -// } - -// #[test] -// fn try_recv() { -// let r = after(ms(200)); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(100)); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(200)); -// assert!(r.try_recv().is_ok()); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(200)); -// assert!(r.try_recv().is_err()); -// } - -// #[test] -// fn recv() { -// let start = Instant::now(); -// let r = after(ms(50)); - -// let fired = r.recv().unwrap(); -// assert!(start < fired); -// assert!(fired - start >= ms(50)); - -// let now = Instant::now(); -// assert!(fired < now); -// assert!(now - fired < fired - start); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn recv_timeout() { -// let start = Instant::now(); -// let r = after(ms(200)); - -// assert!(r.recv_timeout(ms(100)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(100)); -// assert!(now - start <= ms(150)); - -// let fired = r.recv_timeout(ms(200)).unwrap(); -// assert!(fired - start >= ms(200)); -// assert!(fired - start <= ms(250)); - -// assert!(r.recv_timeout(ms(200)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(400)); -// assert!(now - start <= ms(450)); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn recv_two() { -// let r1 = after(ms(50)); -// let r2 = after(ms(50)); - -// scope(|scope| { -// scope.spawn(|_| { -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// } -// }); -// scope.spawn(|_| { -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn recv_race() { -// select! { -// recv(after(ms(50))) -> _ => {} -// recv(after(ms(100))) -> _ => panic!(), -// } - -// select! { -// recv(after(ms(100))) -> _ => panic!(), -// recv(after(ms(50))) -> _ => {} -// } -// } - -// #[test] -// fn stress_default() { -// const COUNT: usize = 10; - -// for _ in 0..COUNT { -// select! { -// recv(after(ms(0))) -> _ => {} -// default => panic!(), -// } -// } - -// for _ in 0..COUNT { -// select! { -// recv(after(ms(100))) -> _ => panic!(), -// default => {} -// } -// } -// } - -// #[test] -// fn select() { -// const THREADS: usize = 4; -// const COUNT: usize = 1000; -// const TIMEOUT_MS: u64 = 100; - -// let v = (0..COUNT) -// .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) -// .collect::<Vec<_>>(); -// let hits = AtomicUsize::new(0); - -// scope(|scope| { -// for _ in 0..THREADS { -// scope.spawn(|_| { -// let v: Vec<&_> = v.iter().collect(); - -// loop { -// let timeout = after(ms(TIMEOUT_MS)); -// let mut sel = Select::new(); -// for r in &v { -// sel.recv(r); -// } -// let oper_timeout = sel.recv(&timeout); - -// let oper = sel.select(); -// match oper.index() { -// i if i == oper_timeout => { -// oper.recv(&timeout).unwrap(); -// break; -// } -// i => { -// oper.recv(&v[i]).unwrap(); -// hits.fetch_add(1, Ordering::SeqCst); -// } -// } -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(hits.load(Ordering::SeqCst), COUNT); -// } - -// #[test] -// fn ready() { -// const THREADS: usize = 4; -// const COUNT: usize = 1000; -// const TIMEOUT_MS: u64 = 100; - -// let v = (0..COUNT) -// .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) -// .collect::<Vec<_>>(); -// let hits = AtomicUsize::new(0); - -// scope(|scope| { -// for _ in 0..THREADS { -// scope.spawn(|_| { -// let v: Vec<&_> = v.iter().collect(); - -// loop { -// let timeout = after(ms(TIMEOUT_MS)); -// let mut sel = Select::new(); -// for r in &v { -// sel.recv(r); -// } -// let oper_timeout = sel.recv(&timeout); - -// loop { -// let i = sel.ready(); -// if i == oper_timeout { -// timeout.try_recv().unwrap(); -// return; -// } else if v[i].try_recv().is_ok() { -// hits.fetch_add(1, Ordering::SeqCst); -// break; -// } -// } -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(hits.load(Ordering::SeqCst), COUNT); -// } - -// #[test] -// fn stress_clone() { -// const RUNS: usize = 1000; -// const THREADS: usize = 10; -// const COUNT: usize = 50; - -// for i in 0..RUNS { -// let r = after(ms(i as u64)); - -// scope(|scope| { -// for _ in 0..THREADS { -// scope.spawn(|_| { -// let r = r.clone(); -// let _ = r.try_recv(); - -// for _ in 0..COUNT { -// drop(r.clone()); -// thread::yield_now(); -// } -// }); -// } -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn fairness() { -// const COUNT: usize = 1000; - -// for &dur in &[0, 1] { -// let mut hits = [0usize; 2]; - -// for _ in 0..COUNT { -// select! { -// recv(after(ms(dur))) -> _ => hits[0] += 1, -// recv(after(ms(dur))) -> _ => hits[1] += 1, -// } -// } - -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } -// } - -// #[test] -// fn fairness_duplicates() { -// const COUNT: usize = 1000; - -// for &dur in &[0, 1] { -// let mut hits = [0usize; 5]; - -// for _ in 0..COUNT { -// let r = after(ms(dur)); -// select! { -// recv(r) -> _ => hits[0] += 1, -// recv(r) -> _ => hits[1] += 1, -// recv(r) -> _ => hits[2] += 1, -// recv(r) -> _ => hits[3] += 1, -// recv(r) -> _ => hits[4] += 1, -// } -// } - -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } -// } diff --git a/vendor/flume/tests/array.rs b/vendor/flume/tests/array.rs deleted file mode 100644 index a72bbe3..0000000 --- a/vendor/flume/tests/array.rs +++ /dev/null @@ -1,657 +0,0 @@ -//! Tests for the array channel flavor. - -extern crate crossbeam_utils; -extern crate rand; - -use std::any::Any; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::thread; -use std::time::Duration; - -use flume::{bounded, Receiver}; -use flume::{RecvError, RecvTimeoutError, TryRecvError}; -use flume::{SendError, SendTimeoutError, TrySendError}; -use crossbeam_utils::thread::scope; -use rand::{thread_rng, Rng}; - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn smoke() { - let (s, r) = bounded(1); - s.send(7).unwrap(); - assert_eq!(r.try_recv(), Ok(7)); - - s.send(8).unwrap(); - assert_eq!(r.recv(), Ok(8)); - - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); -} - -#[test] -fn capacity() { - for i in 1..10 { - let (s, r) = bounded::<()>(i); - assert_eq!(s.capacity(), Some(i)); - assert_eq!(r.capacity(), Some(i)); - } -} - -#[test] -fn len_empty_full() { - let (s, r) = bounded(2); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); - - s.send(()).unwrap(); - - assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); - - s.send(()).unwrap(); - - assert_eq!(s.len(), 2); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), true); - assert_eq!(r.len(), 2); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), true); - - r.recv().unwrap(); - - assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); -} - -#[test] -fn try_recv() { - let (s, r) = bounded(100); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - thread::sleep(ms(1500)); - assert_eq!(r.try_recv(), Ok(7)); - thread::sleep(ms(500)); - assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv() { - let (s, r) = bounded(100); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv(), Ok(7)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(8)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(9)); - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - s.send(8).unwrap(); - s.send(9).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv_timeout() { - let (s, r) = bounded::<i32>(100); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); - assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); - assert_eq!( - r.recv_timeout(ms(1000)), - Err(RecvTimeoutError::Disconnected) - ); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn try_send() { - let (s, r) = bounded(1); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(s.try_send(1), Ok(())); - assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); - thread::sleep(ms(1500)); - assert_eq!(s.try_send(3), Ok(())); - thread::sleep(ms(500)); - assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4))); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - assert_eq!(r.try_recv(), Ok(1)); - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - assert_eq!(r.recv(), Ok(3)); - }); - }) - .unwrap(); -} - -#[test] -fn send() { - let (s, r) = bounded(1); - - scope(|scope| { - scope.spawn(|_| { - s.send(7).unwrap(); - thread::sleep(ms(1000)); - s.send(8).unwrap(); - thread::sleep(ms(1000)); - s.send(9).unwrap(); - thread::sleep(ms(1000)); - s.send(10).unwrap(); - }); - scope.spawn(|_| { - thread::sleep(ms(1500)); - assert_eq!(r.recv(), Ok(7)); - assert_eq!(r.recv(), Ok(8)); - assert_eq!(r.recv(), Ok(9)); - }); - }) - .unwrap(); -} - -#[test] -fn send_timeout() { - let (s, r) = bounded(2); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(s.send_timeout(1, ms(1000)), Ok(())); - assert_eq!(s.send_timeout(2, ms(1000)), Ok(())); - assert_eq!( - s.send_timeout(3, ms(500)), - Err(SendTimeoutError::Timeout(3)) - ); - thread::sleep(ms(1000)); - assert_eq!(s.send_timeout(4, ms(1000)), Ok(())); - thread::sleep(ms(1000)); - assert_eq!(s.send(5), Err(SendError(5))); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(1)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(2)); - assert_eq!(r.recv(), Ok(4)); - }); - }) - .unwrap(); -} - -#[test] -fn send_after_disconnect() { - let (s, r) = bounded(100); - - s.send(1).unwrap(); - s.send(2).unwrap(); - s.send(3).unwrap(); - - drop(r); - - assert_eq!(s.send(4), Err(SendError(4))); - assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); - assert_eq!( - s.send_timeout(6, ms(500)), - Err(SendTimeoutError::Disconnected(6)) - ); -} - -#[test] -fn recv_after_disconnect() { - let (s, r) = bounded(100); - - s.send(1).unwrap(); - s.send(2).unwrap(); - s.send(3).unwrap(); - - drop(s); - - assert_eq!(r.recv(), Ok(1)); - assert_eq!(r.recv(), Ok(2)); - assert_eq!(r.recv(), Ok(3)); - assert!(r.recv().is_err()); -} - -#[test] -fn len() { - const COUNT: usize = 25_000; - const CAP: usize = 1000; - - let (s, r) = bounded(CAP); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - for _ in 0..CAP / 10 { - for i in 0..50 { - s.send(i).unwrap(); - assert_eq!(s.len(), i + 1); - } - - for i in 0..50 { - r.recv().unwrap(); - assert_eq!(r.len(), 50 - i - 1); - } - } - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - for i in 0..CAP { - s.send(i).unwrap(); - assert_eq!(s.len(), i + 1); - } - - for _ in 0..CAP { - r.recv().unwrap(); - } - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - assert_eq!(r.recv(), Ok(i)); - let len = r.len(); - assert!(len <= CAP); - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - s.send(i).unwrap(); - let len = s.len(); - assert!(len <= CAP); - } - }); - }) - .unwrap(); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); -} - -#[test] -fn disconnect_wakes_sender() { - let (s, r) = bounded(1); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(s.send(()), Ok(())); - assert_eq!(s.send(()), Err(SendError(()))); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - drop(r); - }); - }) - .unwrap(); -} - -#[test] -fn disconnect_wakes_receiver() { - let (s, r) = bounded::<()>(1); - - scope(|scope| { - scope.spawn(move |_| { - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - drop(s); - }); - }) - .unwrap(); -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - let (s, r) = bounded(3); - - scope(|scope| { - scope.spawn(move |_| { - for i in 0..COUNT { - assert_eq!(r.recv(), Ok(i)); - } - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - }) - .unwrap(); -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let (s, r) = bounded::<usize>(3); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - let n = r.recv().unwrap(); - v[n].fetch_add(1, Ordering::SeqCst); - } - }); - } - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - } - }) - .unwrap(); - - for c in v { - assert_eq!(c.load(Ordering::SeqCst), THREADS); - } -} - -#[test] -fn stress_oneshot() { - const COUNT: usize = 10_000; - - for _ in 0..COUNT { - let (s, r) = bounded(1); - - scope(|scope| { - scope.spawn(|_| r.recv().unwrap()); - scope.spawn(|_| s.send(0).unwrap()); - }) - .unwrap(); - } -} - -#[test] -fn stress_iter() { - const COUNT: usize = 100_000; - - let (request_s, request_r) = bounded(1); - let (response_s, response_r) = bounded(1); - - scope(|scope| { - scope.spawn(move |_| { - let mut count = 0; - loop { - for x in response_r.try_iter() { - count += x; - if count == COUNT { - return; - } - } - request_s.send(()).unwrap(); - } - }); - - for _ in request_r.iter() { - if response_s.send(1).is_err() { - break; - } - } - }) - .unwrap(); -} - -#[test] -fn stress_timeout_two_threads() { - const COUNT: usize = 100; - - let (s, r) = bounded(2); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - loop { - if let Ok(()) = s.send_timeout(i, ms(10)) { - break; - } - } - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - loop { - if let Ok(x) = r.recv_timeout(ms(10)) { - assert_eq!(x, i); - break; - } - } - } - }); - }) - .unwrap(); -} - -#[test] -fn drops() { - const RUNS: usize = 100; - - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..50); - - DROPS.store(0, Ordering::SeqCst); - let (s, r) = bounded::<DropCounter>(50); - - scope(|scope| { - scope.spawn(|_| { - for _ in 0..steps { - r.recv().unwrap(); - } - }); - - scope.spawn(|_| { - for _ in 0..steps { - s.send(DropCounter).unwrap(); - } - }); - }) - .unwrap(); - - for _ in 0..additional { - s.send(DropCounter).unwrap(); - } - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(s); - drop(r); - assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); - } -} - -#[test] -fn linearizable() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let (s, r) = bounded(THREADS); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - s.send(0).unwrap(); - r.try_recv().unwrap(); - } - }); - } - }) - .unwrap(); -} - -// #[test] -// fn fairness() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(COUNT); -// let (s2, r2) = bounded::<()>(COUNT); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let mut hits = [0usize; 2]; -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn fairness_duplicates() { -// const COUNT: usize = 10_000; - -// let (s, r) = bounded::<()>(COUNT); - -// for _ in 0..COUNT { -// s.send(()).unwrap(); -// } - -// let mut hits = [0usize; 5]; -// for _ in 0..COUNT { -// select! { -// recv(r) -> _ => hits[0] += 1, -// recv(r) -> _ => hits[1] += 1, -// recv(r) -> _ => hits[2] += 1, -// recv(r) -> _ => hits[3] += 1, -// recv(r) -> _ => hits[4] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn recv_in_send() { -// let (s, _r) = bounded(1); -// s.send(()).unwrap(); - -// #[allow(unreachable_code)] -// { -// select! { -// send(s, panic!()) -> _ => panic!(), -// default => {} -// } -// } - -// let (s, r) = bounded(2); -// s.send(()).unwrap(); - -// select! { -// send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} -// } -// } - -#[test] -fn channel_through_channel() { - const COUNT: usize = 1000; - - type T = Box<dyn Any + Send>; - - let (s, r) = bounded::<T>(1); - - scope(|scope| { - scope.spawn(move |_| { - let mut s = s; - - for _ in 0..COUNT { - let (new_s, new_r) = bounded(1); - let new_r: T = Box::new(Some(new_r)); - - s.send(new_r).unwrap(); - s = new_s; - } - }); - - scope.spawn(move |_| { - let mut r = r; - - for _ in 0..COUNT { - r = r - .recv() - .unwrap() - .downcast_mut::<Option<Receiver<T>>>() - .unwrap() - .take() - .unwrap() - } - }); - }) - .unwrap(); -} diff --git a/vendor/flume/tests/async.rs b/vendor/flume/tests/async.rs deleted file mode 100644 index 6c2c7f2..0000000 --- a/vendor/flume/tests/async.rs +++ /dev/null @@ -1,276 +0,0 @@ -#[cfg(feature = "async")] -use { - flume::*, - futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future}, - futures::task::{Context, Waker, Poll}, - async_std::prelude::FutureExt, - std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}}, -}; - -#[cfg(feature = "async")] -#[test] -fn r#async_recv() { - let (tx, rx) = unbounded(); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - tx.send(42u32).unwrap(); - }); - - async_std::task::block_on(async { - assert_eq!(rx.recv_async().await.unwrap(), 42); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn r#async_send() { - let (tx, rx) = bounded(1); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - assert_eq!(rx.recv(), Ok(42)); - }); - - async_std::task::block_on(async { - tx.send_async(42u32).await.unwrap(); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn r#async_recv_disconnect() { - let (tx, rx) = bounded::<i32>(0); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(tx) - }); - - async_std::task::block_on(async { - assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected)); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn r#async_send_disconnect() { - let (tx, rx) = bounded(0); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(rx) - }); - - async_std::task::block_on(async { - assert_eq!(tx.send_async(42u32).await, Err(SendError(42))); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn r#async_recv_drop_recv() { - let (tx, rx) = bounded::<i32>(10); - - let recv_fut = rx.recv_async(); - - async_std::task::block_on(async { - let res = async_std::future::timeout(std::time::Duration::from_millis(500), rx.recv_async()).await; - assert!(res.is_err()); - }); - - let rx2 = rx.clone(); - let t = std::thread::spawn(move || { - async_std::task::block_on(async { - rx2.recv_async().await - }) - }); - - std::thread::sleep(std::time::Duration::from_millis(500)); - - tx.send(42).unwrap(); - - drop(recv_fut); - - assert_eq!(t.join().unwrap(), Ok(42)) -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn r#async_send_1_million_no_drop_or_reorder() { - #[derive(Debug)] - enum Message { - Increment { - old: u64, - }, - ReturnCount, - } - - let (tx, rx) = unbounded(); - - let t = async_std::task::spawn(async move { - let mut count = 0u64; - - while let Ok(Message::Increment { old }) = rx.recv_async().await { - assert_eq!(old, count); - count += 1; - } - - count - }); - - for next in 0..1_000_000 { - tx.send(Message::Increment { old: next }).unwrap(); - } - - tx.send(Message::ReturnCount).unwrap(); - - let count = t.await; - assert_eq!(count, 1_000_000) -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn parallel_async_receivers() { - let (tx, rx) = flume::unbounded(); - let send_fut = async move { - let n_sends: usize = 100000; - for _ in 0..n_sends { - tx.send_async(()).await.unwrap(); - } - }; - - async_std::task::spawn( - send_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Send timed out!")) - ); - - let mut futures_unordered = (0..250) - .map(|_| async { - while let Ok(()) = rx.recv_async().await - /* rx.recv() is OK */ - {} - }) - .collect::<FuturesUnordered<_>>(); - - let recv_fut = async { - while futures_unordered.next().await.is_some() {} - }; - - recv_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Receive timed out!")) - .await - .unwrap(); - - println!("recv end"); -} - -#[cfg(feature = "async")] -#[test] -fn change_waker() { - let (tx, rx) = flume::bounded(1); - tx.send(()).unwrap(); - - struct DebugWaker(Arc<AtomicUsize>, Waker); - - impl DebugWaker { - fn new() -> Self { - let woken = Arc::new(AtomicUsize::new(0)); - let woken_cloned = woken.clone(); - let waker = waker_fn::waker_fn(move || { - woken.fetch_add(1, Ordering::SeqCst); - }); - DebugWaker(woken_cloned, waker) - } - - fn woken(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - fn ctx(&self) -> Context { - Context::from_waker(&self.1) - } - } - - // Check that the waker is correctly updated when sending tasks change their wakers - { - let send_fut = tx.send_async(()); - futures::pin_mut!(send_fut); - - let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); - - // Set the waker to waker1 - assert_eq!(send_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); - - // Change the waker to waker2 - assert_eq!(send_fut.poll(&mut waker2.ctx()), Poll::Pending); - - // Wake the future - rx.recv().unwrap(); - - // Check that waker2 was woken and waker1 was not - assert_eq!(waker1.woken(), 0); - assert_eq!(waker2.woken(), 1); - } - - // Check that the waker is correctly updated when receiving tasks change their wakers - { - rx.recv().unwrap(); - let recv_fut = rx.recv_async(); - futures::pin_mut!(recv_fut); - - let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); - - // Set the waker to waker1 - assert_eq!(recv_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); - - // Change the waker to waker2 - assert_eq!(recv_fut.poll(&mut waker2.ctx()), Poll::Pending); - - // Wake the future - tx.send(()).unwrap(); - - // Check that waker2 was woken and waker1 was not - assert_eq!(waker1.woken(), 0); - assert_eq!(waker2.woken(), 1); - } -} - -#[cfg(feature = "async")] -#[test] -fn spsc_single_threaded_value_ordering() { - async fn test() { - let (tx, rx) = flume::bounded(4); - tokio::select! { - _ = producer(tx) => {}, - _ = consumer(rx) => {}, - } - } - - async fn producer(tx: flume::Sender<usize>) { - for i in 0..100 { - tx.send_async(i).await.unwrap(); - } - } - - async fn consumer(rx: flume::Receiver<usize>) { - let mut expected = 0; - while let Ok(value) = rx.recv_async().await { - assert_eq!(value, expected); - expected += 1; - } - } - - let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); - rt.block_on(test()); -} diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs deleted file mode 100644 index b937436..0000000 --- a/vendor/flume/tests/basic.rs +++ /dev/null @@ -1,428 +0,0 @@ -use std::time::{Instant, Duration}; -use flume::*; - -#[test] -fn send_recv() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); } - assert!(rx.try_recv().is_err()); -} - -#[test] -fn iter() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - drop(tx); - assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); -} - -#[test] -fn try_iter() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum()); -} - -#[test] -fn iter_threaded() { - let (tx, rx) = unbounded(); - for i in 0..1000 { - let tx = tx.clone(); - std::thread::spawn(move || tx.send(i).unwrap()); - } - drop(tx); - assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn send_timeout() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = bounded(1); - - assert!(tx.send_timeout(42, dur).is_ok()); - - let then = Instant::now(); - assert!(tx.send_timeout(43, dur).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - assert_eq!(rx.drain().count(), 1); - - drop(rx); - - assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err()); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn recv_timeout() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = unbounded(); - let then = Instant::now(); - assert!(rx.recv_timeout(dur).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - tx.send(42).unwrap(); - assert_eq!(rx.recv_timeout(dur), Ok(42)); - assert!(Instant::now().duration_since(now) < max_error); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn recv_deadline() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = unbounded(); - let then = Instant::now(); - assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - tx.send(42).unwrap(); - assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42)); - assert!(Instant::now().duration_since(now) < max_error); -} - -#[test] -fn recv_timeout_missed_send() { - let (tx, rx) = bounded(10); - - assert!(rx.recv_timeout(Duration::from_millis(100)).is_err()); - - tx.send(42).unwrap(); - - assert_eq!(rx.recv(), Ok(42)); -} - -#[test] -fn disconnect_tx() { - let (tx, rx) = unbounded::<()>(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn disconnect_rx() { - let (tx, rx) = unbounded(); - drop(rx); - assert!(tx.send(0).is_err()); -} - -#[test] -fn drain() { - let (tx, rx) = unbounded(); - - for i in 0..100 { - tx.send(i).unwrap(); - } - - assert_eq!(rx.drain().sum::<u32>(), (0..100).sum()); - - for i in 0..100 { - tx.send(i).unwrap(); - } - - for i in 0..100 { - tx.send(i).unwrap(); - } - - rx.recv().unwrap(); - - (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r)); -} - -#[test] -fn try_send() { - let (tx, rx) = bounded(5); - - for i in 0..5 { - tx.try_send(i).unwrap(); - } - - assert!(tx.try_send(42).is_err()); - - assert_eq!(rx.recv(), Ok(0)); - - assert_eq!(tx.try_send(42), Ok(())); - - assert_eq!(rx.recv(), Ok(1)); - drop(rx); - - assert!(tx.try_send(42).is_err()); -} - -#[test] -fn send_bounded() { - let (tx, rx) = bounded(5); - - for _ in 0..5 { - tx.send(42).unwrap(); - } - - let _ = rx.recv().unwrap(); - - tx.send(42).unwrap(); - - assert!(tx.try_send(42).is_err()); - - rx.drain(); - - let mut ts = Vec::new(); - for _ in 0..100 { - let tx = tx.clone(); - ts.push(std::thread::spawn(move || { - for i in 0..10000 { - tx.send(i).unwrap(); - } - })); - } - - drop(tx); - - assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100); - - for t in ts { - t.join().unwrap(); - } - - assert!(rx.recv().is_err()); -} - -#[test] -fn rendezvous() { - let (tx, rx) = bounded(0); - - for i in 0..5 { - let tx = tx.clone(); - let t = std::thread::spawn(move || { - assert!(tx.try_send(()).is_err()); - - let then = Instant::now(); - tx.send(()).unwrap(); - let now = Instant::now(); - - assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i); - }); - - std::thread::sleep(Duration::from_millis(1000)); - rx.recv().unwrap(); - - t.join().unwrap(); - } -} - -#[test] -fn hydra() { - let thread_num = 32; - let msg_num = 1000; - - let (main_tx, main_rx) = unbounded::<()>(); - - let mut txs = Vec::new(); - for _ in 0..thread_num { - let main_tx = main_tx.clone(); - let (tx, rx) = unbounded(); - txs.push(tx); - - std::thread::spawn(move || { - for msg in rx.iter() { - main_tx.send(msg).unwrap(); - } - }); - } - - drop(main_tx); - - for _ in 0..10 { - for tx in &txs { - for _ in 0..msg_num { - tx.send(Default::default()).unwrap(); - } - } - - for _ in 0..thread_num { - for _ in 0..msg_num { - main_rx.recv().unwrap(); - } - } - } - - drop(txs); - assert!(main_rx.recv().is_err()); -} - -#[test] -fn robin() { - let thread_num = 32; - let msg_num = 10; - - let (mut main_tx, main_rx) = bounded::<()>(1); - - for _ in 0..thread_num { - let (mut tx, rx) = bounded(100); - std::mem::swap(&mut tx, &mut main_tx); - - std::thread::spawn(move || { - for msg in rx.iter() { - tx.send(msg).unwrap(); - } - }); - } - - for _ in 0..10 { - let main_tx = main_tx.clone(); - std::thread::spawn(move || { - for _ in 0..msg_num { - main_tx.send(Default::default()).unwrap(); - } - }); - - for _ in 0..msg_num { - main_rx.recv().unwrap(); - } - } -} - -#[cfg(feature = "select")] -#[test] -fn select_general() { - #[derive(Debug, PartialEq)] - struct Foo(usize); - - let (tx0, rx0) = bounded(1); - let (tx1, rx1) = unbounded(); - - for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() { - std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - let _ = t.send(Foo(i)); - }); - } - - let x = Selector::new() - .recv(&rx0, |x| x) - .recv(&rx1, |x| x) - .wait() - .unwrap(); - - if x == Foo(0) { - assert!(rx1.recv().unwrap() == Foo(1)); - } else { - assert!(rx0.recv().unwrap() == Foo(0)); - } - - tx0.send(Foo(42)).unwrap(); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(100)); - assert_eq!(rx0.recv().unwrap(), Foo(42)); - assert_eq!(rx0.recv().unwrap(), Foo(43)); - - }); - - Selector::new() - .send(&tx0, Foo(43), |x| x) - .wait() - .unwrap(); - - t.join().unwrap(); -} - -struct MessageWithoutDebug(u32); - -#[test] -// This is a 'does it build' test, to make sure that the error types can turn -// into a std::error::Error without requiring the payload (which is not used -// there) to impl Debug. -fn std_error_without_debug() { - let (tx, rx) = unbounded::<MessageWithoutDebug>(); - - match tx.send(MessageWithoutDebug(1)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.recv() { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match tx.try_send(MessageWithoutDebug(2)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.try_recv() { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.recv_timeout(Duration::from_secs(10000000)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } -} - -#[test] -fn weak_close() { - let (tx, rx) = unbounded::<()>(); - let weak = tx.downgrade(); - drop(tx); - assert!(weak.upgrade().is_none()); - assert!(rx.is_disconnected()); - assert!(rx.try_recv().is_err()); -} - -#[test] -fn weak_upgrade() { - let (tx, rx) = unbounded(); - let weak = tx.downgrade(); - let tx2 = weak.upgrade().unwrap(); - drop(tx); - assert!(!rx.is_disconnected()); - tx2.send(()).unwrap(); - assert!(rx.try_recv().is_ok()); -} diff --git a/vendor/flume/tests/check_same_channel.rs b/vendor/flume/tests/check_same_channel.rs deleted file mode 100644 index edb82c3..0000000 --- a/vendor/flume/tests/check_same_channel.rs +++ /dev/null @@ -1,57 +0,0 @@ -#[test] -fn same_sender() { - let (tx1, _rx) = flume::unbounded::<()>(); - let tx2 = tx1.clone(); - - assert!(tx1.same_channel(&tx2)); - - let (tx3, _rx) = flume::unbounded::<()>(); - - assert!(!tx1.same_channel(&tx3)); - assert!(!tx2.same_channel(&tx3)); -} - -#[test] -fn same_receiver() { - let (_tx, rx1) = flume::unbounded::<()>(); - let rx2 = rx1.clone(); - - assert!(rx1.same_channel(&rx2)); - - let (_tx, rx3) = flume::unbounded::<()>(); - - assert!(!rx1.same_channel(&rx3)); - assert!(!rx2.same_channel(&rx3)); -} - -#[cfg(feature = "async")] -#[test] -fn same_send_sink() { - let (tx1, _rx) = flume::unbounded::<()>(); - let tx1 = tx1.into_sink(); - let tx2 = tx1.clone(); - - assert!(tx1.same_channel(&tx2)); - - let (tx3, _rx) = flume::unbounded::<()>(); - let tx3 = tx3.into_sink(); - - assert!(!tx1.same_channel(&tx3)); - assert!(!tx2.same_channel(&tx3)); -} - -#[cfg(feature = "async")] -#[test] -fn same_recv_stream() { - let (_tx, rx1) = flume::unbounded::<()>(); - let rx1 = rx1.into_stream(); - let rx2 = rx1.clone(); - - assert!(rx1.same_channel(&rx2)); - - let (_tx, rx3) = flume::unbounded::<()>(); - let rx3 = rx3.into_stream(); - - assert!(!rx1.same_channel(&rx3)); - assert!(!rx2.same_channel(&rx3)); -} diff --git a/vendor/flume/tests/golang.rs b/vendor/flume/tests/golang.rs deleted file mode 100644 index ca00840..0000000 --- a/vendor/flume/tests/golang.rs +++ /dev/null @@ -1,1445 +0,0 @@ -// //! Tests copied from Go and manually rewritten in Rust. -// //! -// //! Source: -// //! - https://github.com/golang/go -// //! -// //! Copyright & License: -// //! - Copyright (c) 2009 The Go Authors -// //! - https://golang.org/AUTHORS -// //! - https://golang.org/LICENSE -// //! - https://golang.org/PATENTS - -// use std::any::Any; -// use std::cell::Cell; -// use std::collections::HashMap; -// use std::sync::{Arc, Condvar, Mutex}; -// use std::thread; -// use std::time::Duration; - -// use flume::{bounded, tick, Receiver, Select, Sender}; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// struct Chan<T> { -// inner: Arc<Mutex<ChanInner<T>>>, -// } - -// struct ChanInner<T> { -// s: Option<Sender<T>>, -// r: Receiver<T>, -// } - -// impl<T> Clone for Chan<T> { -// fn clone(&self) -> Chan<T> { -// Chan { -// inner: self.inner.clone(), -// } -// } -// } - -// impl<T> Chan<T> { -// fn send(&self, msg: T) { -// let s = self -// .inner -// .lock() -// .unwrap() -// .s -// .as_ref() -// .expect("sending into closed channel") -// .clone(); -// let _ = s.send(msg); -// } - -// fn try_recv(&self) -> Option<T> { -// let r = self.inner.lock().unwrap().r.clone(); -// r.try_recv().ok() -// } - -// fn recv(&self) -> Option<T> { -// let r = self.inner.lock().unwrap().r.clone(); -// r.recv().ok() -// } - -// fn close(&self) { -// self.inner -// .lock() -// .unwrap() -// .s -// .take() -// .expect("channel already closed"); -// } - -// fn rx(&self) -> Receiver<T> { -// self.inner.lock().unwrap().r.clone() -// } - -// fn tx(&self) -> Sender<T> { -// match self.inner.lock().unwrap().s.as_ref() { -// None => { -// let (s, r) = bounded(0); -// std::mem::forget(r); -// s -// } -// Some(s) => s.clone(), -// } -// } -// } - -// impl<T> Iterator for Chan<T> { -// type Item = T; - -// fn next(&mut self) -> Option<Self::Item> { -// self.recv() -// } -// } - -// impl<'a, T> IntoIterator for &'a Chan<T> { -// type Item = T; -// type IntoIter = Chan<T>; - -// fn into_iter(self) -> Self::IntoIter { -// self.clone() -// } -// } - -// fn make<T>(cap: usize) -> Chan<T> { -// let (s, r) = bounded(cap); -// Chan { -// inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), -// } -// } - -// #[derive(Clone)] -// struct WaitGroup(Arc<WaitGroupInner>); - -// struct WaitGroupInner { -// cond: Condvar, -// count: Mutex<i32>, -// } - -// impl WaitGroup { -// fn new() -> WaitGroup { -// WaitGroup(Arc::new(WaitGroupInner { -// cond: Condvar::new(), -// count: Mutex::new(0), -// })) -// } - -// fn add(&self, delta: i32) { -// let mut count = self.0.count.lock().unwrap(); -// *count += delta; -// assert!(*count >= 0); -// self.0.cond.notify_all(); -// } - -// fn done(&self) { -// self.add(-1); -// } - -// fn wait(&self) { -// let mut count = self.0.count.lock().unwrap(); -// while *count > 0 { -// count = self.0.cond.wait(count).unwrap(); -// } -// } -// } - -// struct Defer<F: FnOnce()> { -// f: Option<Box<F>>, -// } - -// impl<F: FnOnce()> Drop for Defer<F> { -// fn drop(&mut self) { -// let f = self.f.take().unwrap(); -// let mut f = Some(f); -// let mut f = move || f.take().unwrap()(); -// f(); -// } -// } - -// macro_rules! defer { -// ($body:expr) => { -// let _defer = Defer { -// f: Some(Box::new(|| $body)), -// }; -// }; -// } - -// macro_rules! go { -// (@parse ref $v:ident, $($tail:tt)*) => {{ -// let ref $v = $v; -// go!(@parse $($tail)*) -// }}; -// (@parse move $v:ident, $($tail:tt)*) => {{ -// let $v = $v; -// go!(@parse $($tail)*) -// }}; -// (@parse $v:ident, $($tail:tt)*) => {{ -// let $v = $v.clone(); -// go!(@parse $($tail)*) -// }}; -// (@parse $body:expr) => { -// ::std::thread::spawn(move || { -// let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| { -// $body -// })); -// if res.is_err() { -// eprintln!("goroutine panicked: {:?}", res); -// ::std::process::abort(); -// } -// }) -// }; -// (@parse $($tail:tt)*) => { -// compile_error!("invalid `go!` syntax") -// }; -// ($($tail:tt)*) => {{ -// go!(@parse $($tail)*) -// }}; -// } - -// // https://github.com/golang/go/blob/master/test/chan/doubleselect.go -// mod doubleselect { -// use super::*; - -// const ITERATIONS: i32 = 10_000; - -// fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { -// defer! { c1.close() } -// defer! { c2.close() } -// defer! { c3.close() } -// defer! { c4.close() } - -// for i in 0..n { -// select! { -// send(c1.tx(), i) -> _ => {} -// send(c2.tx(), i) -> _ => {} -// send(c3.tx(), i) -> _ => {} -// send(c4.tx(), i) -> _ => {} -// } -// } -// } - -// fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) { -// for v in inp { -// out.send(v); -// } -// done.send(true); -// } - -// fn recver(inp: Chan<i32>) { -// let mut seen = HashMap::new(); - -// for v in &inp { -// if seen.contains_key(&v) { -// panic!("got duplicate value for {}", v); -// } -// seen.insert(v, true); -// } -// } - -// #[test] -// fn main() { -// let c1 = make::<i32>(0); -// let c2 = make::<i32>(0); -// let c3 = make::<i32>(0); -// let c4 = make::<i32>(0); -// let done = make::<bool>(0); -// let cmux = make::<i32>(0); - -// go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4)); -// go!(cmux, c1, done, mux(cmux, c1, done)); -// go!(cmux, c2, done, mux(cmux, c2, done)); -// go!(cmux, c3, done, mux(cmux, c3, done)); -// go!(cmux, c4, done, mux(cmux, c4, done)); -// go!(done, cmux, { -// done.recv(); -// done.recv(); -// done.recv(); -// done.recv(); -// cmux.close(); -// }); -// recver(cmux); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/fifo.go -// mod fifo { -// use super::*; - -// const N: i32 = 10; - -// #[test] -// fn asynch_fifo() { -// let ch = make::<i32>(N as usize); -// for i in 0..N { -// ch.send(i); -// } -// for i in 0..N { -// if ch.recv() != Some(i) { -// panic!("bad receive"); -// } -// } -// } - -// fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) { -// inp.recv(); -// if ch.recv() != Some(val) { -// panic!(val); -// } -// out.send(1); -// } - -// #[test] -// fn synch_fifo() { -// let ch = make::<i32>(0); -// let mut inp = make::<i32>(0); -// let start = inp.clone(); - -// for i in 0..N { -// let out = make::<i32>(0); -// go!(ch, i, inp, out, chain(ch, i, inp, out)); -// inp = out; -// } - -// start.send(0); -// for i in 0..N { -// ch.send(i); -// } -// inp.recv(); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/goroutines.go -// mod goroutines { -// use super::*; - -// fn f(left: Chan<i32>, right: Chan<i32>) { -// left.send(right.recv().unwrap()); -// } - -// #[test] -// fn main() { -// let n = 100i32; - -// let leftmost = make::<i32>(0); -// let mut right = leftmost.clone(); -// let mut left = leftmost.clone(); - -// for _ in 0..n { -// right = make::<i32>(0); -// go!(left, right, f(left, right)); -// left = right.clone(); -// } - -// go!(right, right.send(1)); -// leftmost.recv().unwrap(); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/nonblock.go -// mod nonblock { -// use super::*; - -// fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) { -// if c.recv().unwrap() != 123 { -// panic!("i32 value"); -// } -// strobe.send(true); -// } - -// fn i32sender(c: Chan<i32>, strobe: Chan<bool>) { -// c.send(234); -// strobe.send(true); -// } - -// fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) { -// if c.recv().unwrap() != 123456 { -// panic!("i64 value"); -// } -// strobe.send(true); -// } - -// fn i64sender(c: Chan<i64>, strobe: Chan<bool>) { -// c.send(234567); -// strobe.send(true); -// } - -// fn breceiver(c: Chan<bool>, strobe: Chan<bool>) { -// if !c.recv().unwrap() { -// panic!("b value"); -// } -// strobe.send(true); -// } - -// fn bsender(c: Chan<bool>, strobe: Chan<bool>) { -// c.send(true); -// strobe.send(true); -// } - -// fn sreceiver(c: Chan<String>, strobe: Chan<bool>) { -// if c.recv().unwrap() != "hello" { -// panic!("x value"); -// } -// strobe.send(true); -// } - -// fn ssender(c: Chan<String>, strobe: Chan<bool>) { -// c.send("hello again".to_string()); -// strobe.send(true); -// } - -// const MAX_TRIES: usize = 10000; // Up to 100ms per test. - -// #[test] -// fn main() { -// let ticker = tick(Duration::new(0, 10_000)); // 10 us -// let sleep = || { -// ticker.recv().unwrap(); -// ticker.recv().unwrap(); -// thread::yield_now(); -// thread::yield_now(); -// thread::yield_now(); -// }; - -// let sync = make::<bool>(0); - -// for buffer in 0..2 { -// let c32 = make::<i32>(buffer); -// let c64 = make::<i64>(buffer); -// let cb = make::<bool>(buffer); -// let cs = make::<String>(buffer); - -// select! { -// recv(c32.rx()) -> _ => panic!("blocked i32sender"), -// default => {} -// } - -// select! { -// recv(c64.rx()) -> _ => panic!("blocked i64sender"), -// default => {} -// } - -// select! { -// recv(cb.rx()) -> _ => panic!("blocked bsender"), -// default => {} -// } - -// select! { -// recv(cs.rx()) -> _ => panic!("blocked ssender"), -// default => {} -// } - -// go!(c32, sync, i32receiver(c32, sync)); -// let mut try = 0; -// loop { -// select! { -// send(c32.tx(), 123) -> _ => break, -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("i32receiver buffer={}", buffer); -// panic!("fail") -// } -// sleep(); -// } -// } -// } -// sync.recv(); -// go!(c32, sync, i32sender(c32, sync)); -// if buffer > 0 { -// sync.recv(); -// } -// let mut try = 0; -// loop { -// select! { -// recv(c32.rx()) -> v => { -// if v != Ok(234) { -// panic!("i32sender value"); -// } -// break; -// } -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("i32sender buffer={}", buffer); -// panic!("fail"); -// } -// sleep(); -// } -// } -// } -// if buffer == 0 { -// sync.recv(); -// } - -// go!(c64, sync, i64receiver(c64, sync)); -// let mut try = 0; -// loop { -// select! { -// send(c64.tx(), 123456) -> _ => break, -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("i64receiver buffer={}", buffer); -// panic!("fail") -// } -// sleep(); -// } -// } -// } -// sync.recv(); -// go!(c64, sync, i64sender(c64, sync)); -// if buffer > 0 { -// sync.recv(); -// } -// let mut try = 0; -// loop { -// select! { -// recv(c64.rx()) -> v => { -// if v != Ok(234567) { -// panic!("i64sender value"); -// } -// break; -// } -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("i64sender buffer={}", buffer); -// panic!("fail"); -// } -// sleep(); -// } -// } -// } -// if buffer == 0 { -// sync.recv(); -// } - -// go!(cb, sync, breceiver(cb, sync)); -// let mut try = 0; -// loop { -// select! { -// send(cb.tx(), true) -> _ => break, -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("breceiver buffer={}", buffer); -// panic!("fail") -// } -// sleep(); -// } -// } -// } -// sync.recv(); -// go!(cb, sync, bsender(cb, sync)); -// if buffer > 0 { -// sync.recv(); -// } -// let mut try = 0; -// loop { -// select! { -// recv(cb.rx()) -> v => { -// if v != Ok(true) { -// panic!("bsender value"); -// } -// break; -// } -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("bsender buffer={}", buffer); -// panic!("fail"); -// } -// sleep(); -// } -// } -// } -// if buffer == 0 { -// sync.recv(); -// } - -// go!(cs, sync, sreceiver(cs, sync)); -// let mut try = 0; -// loop { -// select! { -// send(cs.tx(), "hello".to_string()) -> _ => break, -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("sreceiver buffer={}", buffer); -// panic!("fail") -// } -// sleep(); -// } -// } -// } -// sync.recv(); -// go!(cs, sync, ssender(cs, sync)); -// if buffer > 0 { -// sync.recv(); -// } -// let mut try = 0; -// loop { -// select! { -// recv(cs.rx()) -> v => { -// if v != Ok("hello again".to_string()) { -// panic!("ssender value"); -// } -// break; -// } -// default => { -// try += 1; -// if try > MAX_TRIES { -// println!("ssender buffer={}", buffer); -// panic!("fail"); -// } -// sleep(); -// } -// } -// } -// if buffer == 0 { -// sync.recv(); -// } -// } -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/select.go -// mod select { -// use super::*; - -// #[test] -// fn main() { -// let shift = Cell::new(0); -// let counter = Cell::new(0); - -// let get_value = || { -// counter.set(counter.get() + 1); -// 1 << shift.get() -// }; - -// let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| { -// let mut i = 0; -// let never = make::<u32>(0); -// loop { -// let nil1 = never.tx(); -// let nil2 = never.tx(); -// let v1 = get_value(); -// let v2 = get_value(); -// select! { -// send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => { -// i += 1; -// a = None; -// } -// send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => { -// i += 1; -// b = None; -// } -// default => break, -// } -// shift.set(shift.get() + 1); -// } -// i -// }; - -// let a = make::<u32>(1); -// let b = make::<u32>(1); - -// assert_eq!(send(Some(&a), Some(&b)), 2); - -// let av = a.recv().unwrap(); -// let bv = b.recv().unwrap(); -// assert_eq!(av | bv, 3); - -// assert_eq!(send(Some(&a), None), 1); -// assert_eq!(counter.get(), 10); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/select2.go -// mod select2 { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/test/chan/select3.go -// mod select3 { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/test/chan/select4.go -// mod select4 { -// use super::*; - -// #[test] -// fn main() { -// let c = make::<i32>(1); -// let c1 = make::<i32>(0); -// c.send(42); -// select! { -// recv(c1.rx()) -> _ => panic!("BUG"), -// recv(c.rx()) -> v => assert_eq!(v, Ok(42)), -// } -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/select6.go -// mod select6 { -// use super::*; - -// #[test] -// fn main() { -// let c1 = make::<bool>(0); -// let c2 = make::<bool>(0); -// let c3 = make::<bool>(0); - -// go!(c1, c1.recv()); -// go!(c1, c2, c3, { -// select! { -// recv(c1.rx()) -> _ => panic!("dummy"), -// recv(c2.rx()) -> _ => c3.send(true), -// } -// c1.recv(); -// }); -// go!(c2, c2.send(true)); - -// c3.recv(); -// c1.send(true); -// c1.send(true); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/select7.go -// mod select7 { -// use super::*; - -// fn recv1(c: Chan<i32>) { -// c.recv().unwrap(); -// } - -// fn recv2(c: Chan<i32>) { -// select! { -// recv(c.rx()) -> _ => () -// } -// } - -// fn recv3(c: Chan<i32>) { -// let c2 = make::<i32>(1); -// select! { -// recv(c.rx()) -> _ => (), -// recv(c2.rx()) -> _ => () -// } -// } - -// fn send1(recv: fn(Chan<i32>)) { -// let c = make::<i32>(1); -// go!(c, recv(c)); -// thread::yield_now(); -// c.send(1); -// } - -// fn send2(recv: fn(Chan<i32>)) { -// let c = make::<i32>(1); -// go!(c, recv(c)); -// thread::yield_now(); -// select! { -// send(c.tx(), 1) -> _ => () -// } -// } - -// fn send3(recv: fn(Chan<i32>)) { -// let c = make::<i32>(1); -// go!(c, recv(c)); -// thread::yield_now(); -// let c2 = make::<i32>(1); -// select! { -// send(c.tx(), 1) -> _ => (), -// send(c2.tx(), 1) -> _ => () -// } -// } - -// #[test] -// fn main() { -// send1(recv1); -// send2(recv1); -// send3(recv1); -// send1(recv2); -// send2(recv2); -// send3(recv2); -// send1(recv3); -// send2(recv3); -// send3(recv3); -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/sieve1.go -// mod sieve1 { -// use super::*; - -// fn generate(ch: Chan<i32>) { -// let mut i = 2; -// loop { -// ch.send(i); -// i += 1; -// } -// } - -// fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) { -// for i in in_ch { -// if i % prime != 0 { -// out_ch.send(i); -// } -// } -// } - -// fn sieve(primes: Chan<i32>) { -// let mut ch = make::<i32>(1); -// go!(ch, generate(ch)); -// loop { -// let prime = ch.recv().unwrap(); -// primes.send(prime); - -// let ch1 = make::<i32>(1); -// go!(ch, ch1, prime, filter(ch, ch1, prime)); -// ch = ch1; -// } -// } - -// #[test] -// fn main() { -// let primes = make::<i32>(1); -// go!(primes, sieve(primes)); - -// let a = [ -// 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, -// 89, 97, -// ]; -// for item in a.iter() { -// let x = primes.recv().unwrap(); -// if x != *item { -// println!("{} != {}", x, item); -// panic!("fail"); -// } -// } -// } -// } - -// // https://github.com/golang/go/blob/master/test/chan/zerosize.go -// mod zerosize { -// use super::*; - -// #[test] -// fn zero_size_struct() { -// struct ZeroSize; -// let _ = make::<ZeroSize>(0); -// } - -// #[test] -// fn zero_size_array() { -// let _ = make::<[u8; 0]>(0); -// } -// } - -// // https://github.com/golang/go/blob/master/src/runtime/chan_test.go -// mod chan_test { -// use super::*; - -// #[test] -// fn test_chan() { -// const N: i32 = 200; - -// for cap in 0..N { -// { -// // Ensure that receive from empty chan blocks. -// let c = make::<i32>(cap as usize); - -// let recv1 = Arc::new(Mutex::new(false)); -// go!(c, recv1, { -// c.recv(); -// *recv1.lock().unwrap() = true; -// }); - -// let recv2 = Arc::new(Mutex::new(false)); -// go!(c, recv2, { -// c.recv(); -// *recv2.lock().unwrap() = true; -// }); - -// thread::sleep(ms(1)); - -// if *recv1.lock().unwrap() || *recv2.lock().unwrap() { -// panic!(); -// } - -// // Ensure that non-blocking receive does not block. -// select! { -// recv(c.rx()) -> _ => panic!(), -// default => {} -// } -// select! { -// recv(c.rx()) -> _ => panic!(), -// default => {} -// } - -// c.send(0); -// c.send(0); -// } - -// { -// // Ensure that send to full chan blocks. -// let c = make::<i32>(cap as usize); -// for i in 0..cap { -// c.send(i); -// } - -// let sent = Arc::new(Mutex::new(0)); -// go!(sent, c, { -// c.send(0); -// *sent.lock().unwrap() = 1; -// }); - -// thread::sleep(ms(1)); - -// if *sent.lock().unwrap() != 0 { -// panic!(); -// } - -// // Ensure that non-blocking send does not block. -// select! { -// send(c.tx(), 0) -> _ => panic!(), -// default => {} -// } -// c.recv(); -// } - -// { -// // Ensure that we receive 0 from closed chan. -// let c = make::<i32>(cap as usize); -// for i in 0..cap { -// c.send(i); -// } -// c.close(); - -// for i in 0..cap { -// let v = c.recv(); -// if v != Some(i) { -// panic!(); -// } -// } - -// if c.recv() != None { -// panic!(); -// } -// if c.try_recv() != None { -// panic!(); -// } -// } - -// { -// // Ensure that close unblocks receive. -// let c = make::<i32>(cap as usize); -// let done = make::<bool>(0); - -// go!(c, done, { -// let v = c.try_recv(); -// done.send(v.is_some()); -// }); - -// thread::sleep(ms(1)); -// c.close(); - -// if !done.recv().unwrap() { -// // panic!(); -// } -// } - -// { -// // Send 100 integers, -// // ensure that we receive them non-corrupted in FIFO order. -// let c = make::<i32>(cap as usize); -// go!(c, { -// for i in 0..100 { -// c.send(i); -// } -// }); -// for i in 0..100 { -// if c.recv() != Some(i) { -// panic!(); -// } -// } - -// // Same, but using recv2. -// go!(c, { -// for i in 0..100 { -// c.send(i); -// } -// }); -// for i in 0..100 { -// if c.recv() != Some(i) { -// panic!(); -// } -// } -// } -// } -// } - -// #[test] -// fn test_nonblock_recv_race() { -// const N: usize = 1000; - -// for _ in 0..N { -// let c = make::<i32>(1); -// c.send(1); - -// let t = go!(c, { -// select! { -// recv(c.rx()) -> _ => {} -// default => panic!("chan is not ready"), -// } -// }); - -// c.close(); -// c.recv(); -// t.join().unwrap(); -// } -// } - -// #[test] -// fn test_nonblock_select_race() { -// const N: usize = 1000; - -// let done = make::<bool>(1); -// for _ in 0..N { -// let c1 = make::<i32>(1); -// let c2 = make::<i32>(1); -// c1.send(1); - -// go!(c1, c2, done, { -// select! { -// recv(c1.rx()) -> _ => {} -// recv(c2.rx()) -> _ => {} -// default => { -// done.send(false); -// return; -// } -// } -// done.send(true); -// }); - -// c2.send(1); -// select! { -// recv(c1.rx()) -> _ => {} -// default => {} -// } -// if !done.recv().unwrap() { -// panic!("no chan is ready"); -// } -// } -// } - -// #[test] -// fn test_nonblock_select_race2() { -// const N: usize = 1000; - -// let done = make::<bool>(1); -// for _ in 0..N { -// let c1 = make::<i32>(1); -// let c2 = make::<i32>(0); -// c1.send(1); - -// go!(c1, c2, done, { -// select! { -// recv(c1.rx()) -> _ => {} -// recv(c2.rx()) -> _ => {} -// default => { -// done.send(false); -// return; -// } -// } -// done.send(true); -// }); - -// c2.close(); -// select! { -// recv(c1.rx()) -> _ => {} -// default => {} -// } -// if !done.recv().unwrap() { -// panic!("no chan is ready"); -// } -// } -// } - -// #[test] -// fn test_self_select() { -// // Ensure that send/recv on the same chan in select -// // does not crash nor deadlock. - -// for &cap in &[0, 10] { -// let wg = WaitGroup::new(); -// wg.add(2); -// let c = make::<i32>(cap); - -// for p in 0..2 { -// let p = p; -// go!(wg, p, c, { -// defer! { wg.done() } -// for i in 0..1000 { -// if p == 0 || i % 2 == 0 { -// select! { -// send(c.tx(), p) -> _ => {} -// recv(c.rx()) -> v => { -// if cap == 0 && v.ok() == Some(p) { -// panic!("self receive"); -// } -// } -// } -// } else { -// select! { -// recv(c.rx()) -> v => { -// if cap == 0 && v.ok() == Some(p) { -// panic!("self receive"); -// } -// } -// send(c.tx(), p) -> _ => {} -// } -// } -// } -// }); -// } -// wg.wait(); -// } -// } - -// #[test] -// fn test_select_stress() { -// let c = vec![ -// make::<i32>(0), -// make::<i32>(0), -// make::<i32>(2), -// make::<i32>(3), -// ]; - -// const N: usize = 10000; - -// // There are 4 goroutines that send N values on each of the chans, -// // + 4 goroutines that receive N values on each of the chans, -// // + 1 goroutine that sends N values on each of the chans in a single select, -// // + 1 goroutine that receives N values on each of the chans in a single select. -// // All these sends, receives and selects interact chaotically at runtime, -// // but we are careful that this whole construct does not deadlock. -// let wg = WaitGroup::new(); -// wg.add(10); - -// for k in 0..4 { -// go!(k, c, wg, { -// for _ in 0..N { -// c[k].send(0); -// } -// wg.done(); -// }); -// go!(k, c, wg, { -// for _ in 0..N { -// c[k].recv(); -// } -// wg.done(); -// }); -// } - -// go!(c, wg, { -// let mut n = [0; 4]; -// let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>(); - -// for _ in 0..4 * N { -// let index = { -// let mut sel = Select::new(); -// let mut opers = [!0; 4]; -// for &i in &[3, 2, 0, 1] { -// if let Some(c) = &c1[i] { -// opers[i] = sel.recv(c); -// } -// } - -// let oper = sel.select(); -// let mut index = !0; -// for i in 0..4 { -// if opers[i] == oper.index() { -// index = i; -// let _ = oper.recv(c1[i].as_ref().unwrap()); -// break; -// } -// } -// index -// }; - -// n[index] += 1; -// if n[index] == N { -// c1[index] = None; -// } -// } -// wg.done(); -// }); - -// go!(c, wg, { -// let mut n = [0; 4]; -// let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>(); - -// for _ in 0..4 * N { -// let index = { -// let mut sel = Select::new(); -// let mut opers = [!0; 4]; -// for &i in &[0, 1, 2, 3] { -// if let Some(c) = &c1[i] { -// opers[i] = sel.send(c); -// } -// } - -// let oper = sel.select(); -// let mut index = !0; -// for i in 0..4 { -// if opers[i] == oper.index() { -// index = i; -// let _ = oper.send(c1[i].as_ref().unwrap(), 0); -// break; -// } -// } -// index -// }; - -// n[index] += 1; -// if n[index] == N { -// c1[index] = None; -// } -// } -// wg.done(); -// }); - -// wg.wait(); -// } - -// #[test] -// fn test_select_fairness() { -// const TRIALS: usize = 10000; - -// let c1 = make::<u8>(TRIALS + 1); -// let c2 = make::<u8>(TRIALS + 1); - -// for _ in 0..TRIALS + 1 { -// c1.send(1); -// c2.send(2); -// } - -// let c3 = make::<u8>(0); -// let c4 = make::<u8>(0); -// let out = make::<u8>(0); -// let done = make::<u8>(0); -// let wg = WaitGroup::new(); - -// wg.add(1); -// go!(wg, c1, c2, c3, c4, out, done, { -// defer! { wg.done() }; -// loop { -// let b; -// select! { -// recv(c3.rx()) -> m => b = m.unwrap(), -// recv(c4.rx()) -> m => b = m.unwrap(), -// recv(c1.rx()) -> m => b = m.unwrap(), -// recv(c2.rx()) -> m => b = m.unwrap(), -// } -// select! { -// send(out.tx(), b) -> _ => {} -// recv(done.rx()) -> _ => return, -// } -// } -// }); - -// let (mut cnt1, mut cnt2) = (0, 0); -// for _ in 0..TRIALS { -// match out.recv() { -// Some(1) => cnt1 += 1, -// Some(2) => cnt2 += 1, -// b => panic!("unexpected value {:?} on channel", b), -// } -// } - -// // If the select in the goroutine is fair, -// // cnt1 and cnt2 should be about the same value. -// // With 10,000 trials, the expected margin of error at -// // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)). - -// let r = cnt1 as f64 / TRIALS as f64; -// let e = (r - 0.5).abs(); - -// if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) { -// panic!( -// "unfair select: in {} trials, results were {}, {}", -// TRIALS, cnt1, cnt2, -// ); -// } - -// done.close(); -// wg.wait(); -// } - -// #[test] -// fn test_chan_send_interface() { -// struct Mt; - -// let c = make::<Box<dyn Any>>(1); -// c.send(Box::new(Mt)); - -// select! { -// send(c.tx(), Box::new(Mt)) -> _ => {} -// default => {} -// } - -// select! { -// send(c.tx(), Box::new(Mt)) -> _ => {} -// send(c.tx(), Box::new(Mt)) -> _ => {} -// default => {} -// } -// } - -// #[test] -// fn test_pseudo_random_send() { -// const N: usize = 100; - -// for cap in 0..N { -// let c = make::<i32>(cap); -// let l = Arc::new(Mutex::new(vec![0i32; N])); -// let done = make::<bool>(0); - -// go!(c, done, l, { -// let mut l = l.lock().unwrap(); -// for i in 0..N { -// thread::yield_now(); -// l[i] = c.recv().unwrap(); -// } -// done.send(true); -// }); - -// for _ in 0..N { -// select! { -// send(c.tx(), 1) -> _ => {} -// send(c.tx(), 0) -> _ => {} -// } -// } -// done.recv(); - -// let mut n0 = 0; -// let mut n1 = 0; -// for &i in l.lock().unwrap().iter() { -// n0 += (i + 1) % 2; -// n1 += i; -// } - -// if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 { -// panic!( -// "Want pseudorandom, got {} zeros and {} ones (chan cap {})", -// n0, n1, cap, -// ); -// } -// } -// } - -// #[test] -// fn test_multi_consumer() { -// const NWORK: usize = 23; -// const NITER: usize = 271828; - -// let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31]; - -// let q = make::<i32>(NWORK * 3); -// let r = make::<i32>(NWORK * 3); - -// let wg = WaitGroup::new(); -// for i in 0..NWORK { -// wg.add(1); -// let w = i; -// go!(q, r, wg, pn, { -// for v in &q { -// if pn[w % pn.len()] == v { -// thread::yield_now(); -// } -// r.send(v); -// } -// wg.done(); -// }); -// } - -// let expect = Arc::new(Mutex::new(0)); -// go!(q, r, expect, wg, pn, { -// for i in 0..NITER { -// let v = pn[i % pn.len()]; -// *expect.lock().unwrap() += v; -// q.send(v); -// } -// q.close(); -// wg.wait(); -// r.close(); -// }); - -// let mut n = 0; -// let mut s = 0; -// for v in &r { -// n += 1; -// s += v; -// } - -// if n != NITER || s != *expect.lock().unwrap() { -// panic!(); -// } -// } - -// #[test] -// fn test_select_duplicate_channel() { -// // This test makes sure we can queue a G on -// // the same channel multiple times. -// let c = make::<i32>(0); -// let d = make::<i32>(0); -// let e = make::<i32>(0); - -// go!(c, d, e, { -// select! { -// recv(c.rx()) -> _ => {} -// recv(d.rx()) -> _ => {} -// recv(e.rx()) -> _ => {} -// } -// e.send(9); -// }); -// thread::sleep(ms(1)); - -// go!(c, c.recv()); -// thread::sleep(ms(1)); - -// d.send(7); -// e.recv(); -// c.send(8); -// } -// } - -// // https://github.com/golang/go/blob/master/test/closedchan.go -// mod closedchan { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go -// mod chanbarrier_test { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go -// mod race_chan_test { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/test/ken/chan.go -// mod chan { -// // TODO -// } - -// // https://github.com/golang/go/blob/master/test/ken/chan1.go -// mod chan1 { -// // TODO -// } diff --git a/vendor/flume/tests/iter.rs b/vendor/flume/tests/iter.rs deleted file mode 100644 index 4d69adb..0000000 --- a/vendor/flume/tests/iter.rs +++ /dev/null @@ -1,112 +0,0 @@ -//! Tests for iteration over receivers. - -extern crate crossbeam_utils; - -use flume::unbounded; -use crossbeam_utils::thread::scope; - -#[test] -fn nested_recv_iter() { - let (s, r) = unbounded::<i32>(); - let (total_s, total_r) = unbounded::<i32>(); - - scope(|scope| { - scope.spawn(move |_| { - let mut acc = 0; - for x in r.iter() { - acc += x; - } - total_s.send(acc).unwrap(); - }); - - s.send(3).unwrap(); - s.send(1).unwrap(); - s.send(2).unwrap(); - drop(s); - assert_eq!(total_r.recv().unwrap(), 6); - }) - .unwrap(); -} - -#[test] -fn recv_iter_break() { - let (s, r) = unbounded::<i32>(); - let (count_s, count_r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - let mut count = 0; - for x in r.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_s.send(count).unwrap(); - }); - - s.send(2).unwrap(); - s.send(2).unwrap(); - s.send(2).unwrap(); - let _ = s.send(2); - drop(s); - assert_eq!(count_r.recv().unwrap(), 4); - }) - .unwrap(); -} - -#[test] -fn recv_try_iter() { - let (request_s, request_r) = unbounded(); - let (response_s, response_r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - let mut count = 0; - loop { - for x in response_r.try_iter() { - count += x; - if count == 6 { - return; - } - } - request_s.send(()).unwrap(); - } - }); - - for _ in request_r.iter() { - if response_s.send(2).is_err() { - break; - } - } - }) - .unwrap(); -} - -#[test] -fn recv_into_iter_owned() { - let mut iter = { - let (s, r) = unbounded::<i32>(); - s.send(1).unwrap(); - s.send(2).unwrap(); - r.into_iter() - }; - - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} - -#[test] -fn recv_into_iter_borrowed() { - let (s, r) = unbounded::<i32>(); - s.send(1).unwrap(); - s.send(2).unwrap(); - drop(s); - - let mut iter = (&r).into_iter(); - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} diff --git a/vendor/flume/tests/list.rs b/vendor/flume/tests/list.rs deleted file mode 100644 index 851af88..0000000 --- a/vendor/flume/tests/list.rs +++ /dev/null @@ -1,536 +0,0 @@ -//! Tests for the list channel flavor. - -extern crate crossbeam_utils; -extern crate rand; - -use std::any::Any; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::thread; -use std::time::Duration; - -use flume::{unbounded, Receiver}; -use flume::{RecvError, RecvTimeoutError, TryRecvError}; -use flume::{SendError, SendTimeoutError, TrySendError}; -use crossbeam_utils::thread::scope; -use rand::{thread_rng, Rng}; - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn smoke() { - let (s, r) = unbounded(); - s.try_send(7).unwrap(); - assert_eq!(r.try_recv(), Ok(7)); - - s.send(8).unwrap(); - assert_eq!(r.recv(), Ok(8)); - - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); -} - -#[test] -fn capacity() { - let (s, r) = unbounded::<()>(); - assert_eq!(s.capacity(), None); - assert_eq!(r.capacity(), None); -} - -#[test] -fn len_empty_full() { - let (s, r) = unbounded(); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); - - s.send(()).unwrap(); - - assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); - - r.recv().unwrap(); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); -} - -#[test] -fn try_recv() { - let (s, r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - thread::sleep(ms(1500)); - assert_eq!(r.try_recv(), Ok(7)); - thread::sleep(ms(500)); - assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv() { - let (s, r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv(), Ok(7)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(8)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(9)); - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - s.send(8).unwrap(); - s.send(9).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv_timeout() { - let (s, r) = unbounded::<i32>(); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); - assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); - assert_eq!( - r.recv_timeout(ms(1000)), - Err(RecvTimeoutError::Disconnected) - ); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn try_send() { - let (s, r) = unbounded(); - for i in 0..1000 { - assert_eq!(s.try_send(i), Ok(())); - } - - drop(r); - assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777))); -} - -#[test] -fn send() { - let (s, r) = unbounded(); - for i in 0..1000 { - assert_eq!(s.send(i), Ok(())); - } - - drop(r); - assert_eq!(s.send(777), Err(SendError(777))); -} - -#[test] -fn send_timeout() { - let (s, r) = unbounded(); - for i in 0..1000 { - assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(())); - } - - drop(r); - assert_eq!( - s.send_timeout(777, ms(0)), - Err(SendTimeoutError::Disconnected(777)) - ); -} - -#[test] -fn send_after_disconnect() { - let (s, r) = unbounded(); - - s.send(1).unwrap(); - s.send(2).unwrap(); - s.send(3).unwrap(); - - drop(r); - - assert_eq!(s.send(4), Err(SendError(4))); - assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); - assert_eq!( - s.send_timeout(6, ms(0)), - Err(SendTimeoutError::Disconnected(6)) - ); -} - -#[test] -fn recv_after_disconnect() { - let (s, r) = unbounded(); - - s.send(1).unwrap(); - s.send(2).unwrap(); - s.send(3).unwrap(); - - drop(s); - - assert_eq!(r.recv(), Ok(1)); - assert_eq!(r.recv(), Ok(2)); - assert_eq!(r.recv(), Ok(3)); - assert!(r.recv().is_err()); -} - -#[test] -fn len() { - let (s, r) = unbounded(); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - for i in 0..50 { - s.send(i).unwrap(); - assert_eq!(s.len(), i + 1); - } - - for i in 0..50 { - r.recv().unwrap(); - assert_eq!(r.len(), 50 - i - 1); - } - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); -} - -#[test] -fn disconnect_wakes_receiver() { - let (s, r) = unbounded::<()>(); - - scope(|scope| { - scope.spawn(move |_| { - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - drop(s); - }); - }) - .unwrap(); -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - let (s, r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - for i in 0..COUNT { - assert_eq!(r.recv(), Ok(i)); - } - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - }) - .unwrap(); -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let (s, r) = unbounded::<usize>(); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - let n = r.recv().unwrap(); - v[n].fetch_add(1, Ordering::SeqCst); - } - }); - } - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - } - }) - .unwrap(); - - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - - for c in v { - assert_eq!(c.load(Ordering::SeqCst), THREADS); - } -} - -#[test] -fn stress_oneshot() { - const COUNT: usize = 10_000; - - for _ in 0..COUNT { - let (s, r) = unbounded(); - - scope(|scope| { - scope.spawn(|_| r.recv().unwrap()); - scope.spawn(|_| s.send(0).unwrap()); - }) - .unwrap(); - } -} - -#[test] -fn stress_iter() { - const COUNT: usize = 100_000; - - let (request_s, request_r) = unbounded(); - let (response_s, response_r) = unbounded(); - - scope(|scope| { - scope.spawn(move |_| { - let mut count = 0; - loop { - for x in response_r.try_iter() { - count += x; - if count == COUNT { - return; - } - } - request_s.send(()).unwrap(); - } - }); - - for _ in request_r.iter() { - if response_s.send(1).is_err() { - break; - } - } - }) - .unwrap(); -} - -#[test] -fn stress_timeout_two_threads() { - const COUNT: usize = 100; - - let (s, r) = unbounded(); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - s.send(i).unwrap(); - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - loop { - if let Ok(x) = r.recv_timeout(ms(10)) { - assert_eq!(x, i); - break; - } - } - } - }); - }) - .unwrap(); -} - -#[test] -fn drops() { - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..100 { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..1000); - - DROPS.store(0, Ordering::SeqCst); - let (s, r) = unbounded::<DropCounter>(); - - scope(|scope| { - scope.spawn(|_| { - for _ in 0..steps { - r.recv().unwrap(); - } - }); - - scope.spawn(|_| { - for _ in 0..steps { - s.send(DropCounter).unwrap(); - } - }); - }) - .unwrap(); - - for _ in 0..additional { - s.try_send(DropCounter).unwrap(); - } - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(s); - drop(r); - assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); - } -} - -#[test] -fn linearizable() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let (s, r) = unbounded(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - s.send(0).unwrap(); - r.try_recv().unwrap(); - } - }); - } - }) - .unwrap(); -} - -// #[test] -// fn fairness() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = unbounded::<()>(); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let mut hits = [0usize; 2]; -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn fairness_duplicates() { -// const COUNT: usize = 10_000; - -// let (s, r) = unbounded(); - -// for _ in 0..COUNT { -// s.send(()).unwrap(); -// } - -// let mut hits = [0usize; 5]; -// for _ in 0..COUNT { -// select! { -// recv(r) -> _ => hits[0] += 1, -// recv(r) -> _ => hits[1] += 1, -// recv(r) -> _ => hits[2] += 1, -// recv(r) -> _ => hits[3] += 1, -// recv(r) -> _ => hits[4] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn recv_in_send() { -// let (s, r) = unbounded(); -// s.send(()).unwrap(); - -// select! { -// send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} -// } -// } - -#[test] -fn channel_through_channel() { - const COUNT: usize = 1000; - - type T = Box<dyn Any + Send>; - - let (s, r) = unbounded::<T>(); - - scope(|scope| { - scope.spawn(move |_| { - let mut s = s; - - for _ in 0..COUNT { - let (new_s, new_r) = unbounded(); - let new_r: T = Box::new(Some(new_r)); - - s.send(new_r).unwrap(); - s = new_s; - } - }); - - scope.spawn(move |_| { - let mut r = r; - - for _ in 0..COUNT { - r = r - .recv() - .unwrap() - .downcast_mut::<Option<Receiver<T>>>() - .unwrap() - .take() - .unwrap() - } - }); - }) - .unwrap(); -} diff --git a/vendor/flume/tests/method_sharing.rs b/vendor/flume/tests/method_sharing.rs deleted file mode 100644 index 24173ea..0000000 --- a/vendor/flume/tests/method_sharing.rs +++ /dev/null @@ -1,39 +0,0 @@ -#[cfg(feature = "async")] -use flume::*; - -#[cfg(feature = "async")] -#[async_std::test] -async fn sender() { - let (sender, receiver) = bounded(1); - - let sender_fut = sender.send_async(()); - assert_eq!(sender.is_disconnected(), sender_fut.is_disconnected()); - assert_eq!(sender.is_empty(), sender_fut.is_empty()); - assert_eq!(sender.is_full(), sender_fut.is_full()); - assert_eq!(sender.len(), sender_fut.len()); - assert_eq!(sender.capacity(), sender_fut.capacity()); - - let sender_sink = sender.sink(); - assert_eq!(sender.is_disconnected(), sender_sink.is_disconnected()); - assert_eq!(sender.is_empty(), sender_sink.is_empty()); - assert_eq!(sender.is_full(), sender_sink.is_full()); - assert_eq!(sender.len(), sender_sink.len()); - assert_eq!(sender.capacity(), sender_sink.capacity()); - - let receiver_fut = receiver.recv_async(); - assert_eq!(receiver.is_disconnected(), receiver_fut.is_disconnected()); - assert_eq!(receiver.is_empty(), receiver_fut.is_empty()); - assert_eq!(receiver.is_full(), receiver_fut.is_full()); - assert_eq!(receiver.len(), receiver_fut.len()); - assert_eq!(receiver.capacity(), receiver_fut.capacity()); - - let receiver_stream = receiver.stream(); - assert_eq!( - receiver.is_disconnected(), - receiver_stream.is_disconnected() - ); - assert_eq!(receiver.is_empty(), receiver_stream.is_empty()); - assert_eq!(receiver.is_full(), receiver_stream.is_full()); - assert_eq!(receiver.len(), receiver_stream.len()); - assert_eq!(receiver.capacity(), receiver_stream.capacity()); -} diff --git a/vendor/flume/tests/mpsc.rs b/vendor/flume/tests/mpsc.rs deleted file mode 100644 index 213b9d8..0000000 --- a/vendor/flume/tests/mpsc.rs +++ /dev/null @@ -1,2095 +0,0 @@ -//! Tests copied from `std::sync::mpsc`. -//! -//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but -//! modified to work with `crossbeam-channel` instead. -//! -//! Minor tweaks were needed to make the tests compile: -//! -//! - Replace `box` syntax with `Box::new`. -//! - Replace all uses of `Select` with `select!`. -//! - Change the imports. -//! - Join all spawned threads. -//! - Removed assertion from oneshot_multi_thread_send_close_stress tests. -//! -//! Source: -//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc -//! -//! Copyright & License: -//! - Copyright 2013-2014 The Rust Project Developers -//! - Apache License, Version 2.0 or MIT license, at your option -//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT -//! - https://www.rust-lang.org/en-US/legal.html - -#[macro_use] -extern crate crossbeam_channel as cc; - -use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; -use std::sync::mpsc::{SendError, TrySendError}; -use std::thread::JoinHandle; -use std::time::Duration; - -pub struct Sender<T> { - pub inner: cc::Sender<T>, -} - -impl<T> Sender<T> { - pub fn send(&self, t: T) -> Result<(), SendError<T>> { - self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) - } -} - -impl<T> Clone for Sender<T> { - fn clone(&self) -> Sender<T> { - Sender { - inner: self.inner.clone(), - } - } -} - -pub struct SyncSender<T> { - pub inner: cc::Sender<T>, -} - -impl<T> SyncSender<T> { - pub fn send(&self, t: T) -> Result<(), SendError<T>> { - self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) - } - - pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { - self.inner.try_send(t).map_err(|err| match err { - cc::TrySendError::Full(m) => TrySendError::Full(m), - cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m), - }) - } -} - -impl<T> Clone for SyncSender<T> { - fn clone(&self) -> SyncSender<T> { - SyncSender { - inner: self.inner.clone(), - } - } -} - -pub struct Receiver<T> { - pub inner: cc::Receiver<T>, -} - -impl<T> Receiver<T> { - pub fn try_recv(&self) -> Result<T, TryRecvError> { - self.inner.try_recv().map_err(|err| match err { - cc::TryRecvError::Empty => TryRecvError::Empty, - cc::TryRecvError::Disconnected => TryRecvError::Disconnected, - }) - } - - pub fn recv(&self) -> Result<T, RecvError> { - self.inner.recv().map_err(|_| RecvError) - } - - pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { - self.inner.recv_timeout(timeout).map_err(|err| match err { - cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout, - cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, - }) - } - - pub fn iter(&self) -> Iter<T> { - Iter { inner: self } - } - - pub fn try_iter(&self) -> TryIter<T> { - TryIter { inner: self } - } -} - -impl<'a, T> IntoIterator for &'a Receiver<T> { - type Item = T; - type IntoIter = Iter<'a, T>; - - fn into_iter(self) -> Iter<'a, T> { - self.iter() - } -} - -impl<T> IntoIterator for Receiver<T> { - type Item = T; - type IntoIter = IntoIter<T>; - - fn into_iter(self) -> IntoIter<T> { - IntoIter { inner: self } - } -} - -pub struct TryIter<'a, T: 'a> { - inner: &'a Receiver<T>, -} - -impl<'a, T> Iterator for TryIter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<T> { - self.inner.try_recv().ok() - } -} - -pub struct Iter<'a, T: 'a> { - inner: &'a Receiver<T>, -} - -impl<'a, T> Iterator for Iter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<T> { - self.inner.recv().ok() - } -} - -pub struct IntoIter<T> { - inner: Receiver<T>, -} - -impl<T> Iterator for IntoIter<T> { - type Item = T; - - fn next(&mut self) -> Option<T> { - self.inner.recv().ok() - } -} - -pub fn channel<T>() -> (Sender<T>, Receiver<T>) { - let (s, r) = cc::unbounded(); - let s = Sender { inner: s }; - let r = Receiver { inner: r }; - (s, r) -} - -pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { - let (s, r) = cc::bounded(bound); - let s = SyncSender { inner: s }; - let r = Receiver { inner: r }; - (s, r) -} - -macro_rules! select { - ( - $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ - ) => ({ - crossbeam_channel_internal! { - $( - recv(($rx).inner) -> res => { - let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); - $code - } - )+ - } - }) -} - -// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs -mod channel_tests { - use super::*; - - use std::env; - use std::thread; - use std::time::{Duration, Instant}; - - pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = channel::<Box<isize>>(); - tx.send(Box::new(1)).unwrap(); - } - - #[test] - fn drop_full_shared() { - let (tx, _rx) = channel::<Box<isize>>(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(Box::new(1)).unwrap(); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); - t.join().unwrap(); - } - - #[test] - fn smoke_port_gone() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(1).is_err()); - } - - #[test] - fn smoke_shared_port_gone() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(1).is_err()) - } - - #[test] - fn smoke_shared_port_gone2() { - let (tx, rx) = channel::<i32>(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); - } - - #[test] - fn port_gone_concurrent() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} - t.join().unwrap(); - } - - #[test] - fn port_gone_concurrent_shared() { - let (tx, rx) = channel::<i32>(); - let tx2 = tx.clone(); - let t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - t.join().unwrap(); - } - - #[test] - fn smoke_chan_gone() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); - } - - #[test] - fn chan_gone_concurrent() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} - t.join().unwrap(); - } - - #[test] - fn stress() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - for _ in 0..10000 { - assert_eq!(rx.recv().unwrap(), 1); - } - t.join().ok().unwrap(); - } - - #[test] - fn stress_shared() { - const AMT: u32 = 10000; - const NTHREADS: u32 = 8; - let (tx, rx) = channel::<i32>(); - - let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - }); - - let mut ts = Vec::with_capacity(NTHREADS as usize); - for _ in 0..NTHREADS { - let tx = tx.clone(); - let t = thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - ts.push(t); - } - drop(tx); - t.join().ok().unwrap(); - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move || { - tx1.send(()).unwrap(); - for _ in 0..40 { - assert_eq!(rx2.recv().unwrap(), 1); - } - }); - rx1.recv().unwrap(); - let t2 = thread::spawn(move || { - for _ in 0..40 { - tx2.send(1).unwrap(); - } - }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); - } - - #[test] - fn recv_from_outside_runtime() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - for _ in 0..40 { - assert_eq!(rx.recv().unwrap(), 1); - } - }); - for _ in 0..40 { - tx.send(1).unwrap(); - } - t.join().ok().unwrap(); - } - - #[test] - fn no_runtime() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move || { - assert_eq!(rx1.recv().unwrap(), 1); - tx2.send(2).unwrap(); - }); - let t2 = thread::spawn(move || { - tx1.send(1).unwrap(); - assert_eq!(rx2.recv().unwrap(), 2); - }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::<i32>(); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::<i32>(); - drop(tx); - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::<Box<i32>>(); - drop(rx); - assert!(tx.send(Box::new(0)).is_err()); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert_eq!(rx.recv(), Err(RecvError)); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::<Box<i32>>(); - tx.send(Box::new(10)).unwrap(); - assert!(*rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::<i32>(); - assert!(tx.send(10).is_ok()); - assert!(rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(10).is_err()); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::<i32>(); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::<i32>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::<i32>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::<Box<i32>>(); - let t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(Box::new(10)).unwrap(); - t.join().unwrap(); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::<Box<i32>>(); - let t = thread::spawn(move || { - drop(tx); - }); - thread::spawn(move || { - assert_eq!(rx.recv(), Err(RecvError)); - }) - .join() - .unwrap(); - t.join().unwrap(); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - drop(rx); - }); - ts.push(t); - drop(tx); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(2 * stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - drop(rx); - }); - ts.push(t); - thread::spawn(move || { - let _ = tx.send(1); - }) - .join() - .unwrap(); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(2 * stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - thread::spawn(move || { - assert_eq!(rx.recv(), Err(RecvError)); - }) - .join() - .unwrap(); - }); - ts.push(t); - let t2 = thread::spawn(move || { - let t = thread::spawn(move || { - drop(tx); - }); - t.join().unwrap(); - }); - ts.push(t2); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = channel::<Box<isize>>(); - let t = thread::spawn(move || { - tx.send(Box::new(10)).unwrap(); - }); - ts.push(t); - assert!(*rx.recv().unwrap() == 10); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn stream_send_recv_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(2 * stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = channel(); - - if let Some(t) = send(tx, 0) { - ts.push(t); - } - if let Some(t2) = recv(rx, 0) { - ts.push(t2); - } - - fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { - if i == 10 { - return None; - } - - Some(thread::spawn(move || { - tx.send(Box::new(i)).unwrap(); - send(tx, i + 1); - })) - } - - fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { - if i == 10 { - return None; - } - - Some(thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - })) - } - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_single_thread_recv_timeout() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - assert_eq!( - rx.recv_timeout(Duration::from_millis(1)), - Err(RecvTimeoutError::Timeout) - ); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - } - - #[test] - fn stress_recv_timeout_two_threads() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - let timeout = Duration::from_millis(100); - - let t = thread::spawn(move || { - for i in 0..stress { - if i % 2 == 0 { - thread::sleep(timeout * 2); - } - tx.send(1usize).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(timeout) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); - t.join().unwrap() - } - - #[test] - fn recv_timeout_upgrade() { - let (tx, rx) = channel::<()>(); - let timeout = Duration::from_millis(1); - let _tx_clone = tx.clone(); - - let start = Instant::now(); - assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); - assert!(Instant::now() >= start + timeout); - } - - #[test] - fn stress_recv_timeout_shared() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - - let mut ts = Vec::with_capacity(stress); - for i in 0..stress { - let tx = tx.clone(); - let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(i as u64 * 10)); - tx.send(1usize).unwrap(); - }); - ts.push(t); - } - - drop(tx); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in 0..10000 { - tx.send(()).unwrap(); - } - for _ in 0..10000 { - rx.recv().unwrap(); - } - } - - #[test] - fn shared_recv_timeout() { - let (tx, rx) = channel(); - let total = 5; - let mut ts = Vec::with_capacity(total); - for _ in 0..total { - let tx = tx.clone(); - let t = thread::spawn(move || { - tx.send(()).unwrap(); - }); - ts.push(t); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - - assert_eq!( - rx.recv_timeout(Duration::from_millis(1)), - Err(RecvTimeoutError::Timeout) - ); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - let mut ts = Vec::with_capacity(total); - for _ in 0..total { - let tx = tx.clone(); - let t = thread::spawn(move || { - tx.send(()).unwrap(); - }); - ts.push(t); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = channel::<i32>(); - let (total_tx, total_rx) = channel::<i32>(); - - let t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - t.join().unwrap(); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = channel::<i32>(); - let (count_tx, count_rx) = channel(); - - let t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - t.join().unwrap(); - } - - #[test] - fn test_recv_try_iter() { - let (request_tx, request_rx) = channel(); - let (response_tx, response_rx) = channel(); - - // Request `x`s until we have `6`. - let t = thread::spawn(move || { - let mut count = 0; - loop { - for x in response_rx.try_iter() { - count += x; - if count == 6 { - return count; - } - } - request_tx.send(()).unwrap(); - } - }); - - for _ in request_rx.iter() { - if response_tx.send(2).is_err() { - break; - } - } - - assert_eq!(t.join().unwrap(), 6); - } - - #[test] - fn test_recv_into_iter_owned() { - let mut iter = { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - - rx.into_iter() - }; - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); - } - - #[test] - fn test_recv_into_iter_borrowed() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - let mut iter = (&rx).into_iter(); - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - t.join().unwrap(); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let tx2 = tx.clone(); - drop(tx); - tx2.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn issue_32114() { - let (tx, _) = channel(); - let _ = tx.send(123); - assert_eq!(tx.send(123), Err(SendError(123))); - } -} - -// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs -mod sync_channel_tests { - use super::*; - - use std::env; - use std::thread; - use std::time::Duration; - - pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = sync_channel::<Box<isize>>(1); - tx.send(Box::new(1)).unwrap(); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn recv_timeout() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!( - rx.recv_timeout(Duration::from_millis(1)), - Err(RecvTimeoutError::Timeout) - ); - tx.send(1).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); - t.join().unwrap(); - } - - #[test] - fn smoke_port_gone() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - assert!(tx.send(1).is_err()); - } - - #[test] - fn smoke_shared_port_gone2() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); - } - - #[test] - fn port_gone_concurrent() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} - t.join().unwrap(); - } - - #[test] - fn port_gone_concurrent_shared() { - let (tx, rx) = sync_channel::<i32>(0); - let tx2 = tx.clone(); - let t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - t.join().unwrap(); - } - - #[test] - fn smoke_chan_gone() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn smoke_chan_gone_shared() { - let (tx, rx) = sync_channel::<()>(0); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); - } - - #[test] - fn chan_gone_concurrent() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} - t.join().unwrap(); - } - - #[test] - fn stress() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - for _ in 0..10000 { - assert_eq!(rx.recv().unwrap(), 1); - } - t.join().unwrap(); - } - - #[test] - fn stress_recv_timeout_two_threads() { - let (tx, rx) = sync_channel::<i32>(0); - - let t = thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(1)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, 10000); - t.join().unwrap(); - } - - #[test] - fn stress_recv_timeout_shared() { - const AMT: u32 = 1000; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::<i32>(0); - let (dtx, drx) = sync_channel::<()>(0); - - let t = thread::spawn(move || { - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, AMT * NTHREADS); - assert!(rx.try_recv().is_err()); - - dtx.send(()).unwrap(); - }); - - let mut ts = Vec::with_capacity(NTHREADS as usize); - for _ in 0..NTHREADS { - let tx = tx.clone(); - let t = thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - ts.push(t); - } - - drop(tx); - - drx.recv().unwrap(); - for t in ts { - t.join().unwrap(); - } - t.join().unwrap(); - } - - #[test] - fn stress_shared() { - const AMT: u32 = 1000; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::<i32>(0); - let (dtx, drx) = sync_channel::<()>(0); - - let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - dtx.send(()).unwrap(); - }); - - let mut ts = Vec::with_capacity(NTHREADS as usize); - for _ in 0..NTHREADS { - let tx = tx.clone(); - let t = thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - ts.push(t); - } - drop(tx); - drx.recv().unwrap(); - for t in ts { - t.join().unwrap(); - } - t.join().unwrap(); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = sync_channel::<i32>(0); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = sync_channel::<i32>(0); - drop(tx); - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = sync_channel::<Box<i32>>(0); - drop(rx); - assert!(tx.send(Box::new(0)).is_err()); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert_eq!(rx.recv(), Err(RecvError)); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = sync_channel::<Box<i32>>(1); - tx.send(Box::new(10)).unwrap(); - assert!(*rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); - } - - #[test] - fn oneshot_single_thread_try_send_closed2() { - let (tx, _rx) = sync_channel::<i32>(0); - assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn oneshot_single_thread_try_recv_closed_with_data() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(10).unwrap(); - drop(tx); - assert_eq!(rx.try_recv(), Ok(10)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = sync_channel::<i32>(0); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(Box::new(10)).unwrap(); - t.join().unwrap(); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let t = thread::spawn(move || { - drop(tx); - }); - thread::spawn(move || { - assert_eq!(rx.recv(), Err(RecvError)); - }) - .join() - .unwrap(); - t.join().unwrap(); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - drop(rx); - }); - ts.push(t); - drop(tx); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - drop(rx); - }); - ts.push(t); - thread::spawn(move || { - let _ = tx.send(1); - }) - .join() - .unwrap(); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(2 * stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - thread::spawn(move || { - assert_eq!(rx.recv(), Err(RecvError)); - }) - .join() - .unwrap(); - }); - ts.push(t); - let t2 = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - ts.push(t2); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let t = thread::spawn(move || { - tx.send(Box::new(10)).unwrap(); - }); - ts.push(t); - assert!(*rx.recv().unwrap() == 10); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn stream_send_recv_stress() { - let stress_factor = stress_factor(); - let mut ts = Vec::with_capacity(2 * stress_factor); - for _ in 0..stress_factor { - let (tx, rx) = sync_channel::<Box<i32>>(0); - - if let Some(t) = send(tx, 0) { - ts.push(t); - } - if let Some(t) = recv(rx, 0) { - ts.push(t); - } - - fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { - if i == 10 { - return None; - } - - Some(thread::spawn(move || { - tx.send(Box::new(i)).unwrap(); - send(tx, i + 1); - })) - } - - fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { - if i == 10 { - return None; - } - - Some(thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - })) - } - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(10000); - for _ in 0..10000 { - tx.send(()).unwrap(); - } - for _ in 0..10000 { - rx.recv().unwrap(); - } - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = sync_channel(0); - let total = stress_factor() + 100; - let mut ts = Vec::with_capacity(total); - for _ in 0..total { - let tx = tx.clone(); - let t = thread::spawn(move || { - tx.send(()).unwrap(); - }); - ts.push(t); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - for t in ts { - t.join().unwrap(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = sync_channel::<i32>(0); - let (total_tx, total_rx) = sync_channel::<i32>(0); - - let t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - t.join().unwrap(); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = sync_channel::<i32>(0); - let (count_tx, count_rx) = sync_channel(0); - - let t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.try_send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - t.join().unwrap(); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = sync_channel::<i32>(1); - let (tx2, rx2) = sync_channel::<()>(1); - let (tx3, rx3) = sync_channel::<()>(1); - let t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - t.join().unwrap(); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = sync_channel::<()>(0); - let (tx2, rx2) = sync_channel::<()>(0); - let t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let tx2 = tx.clone(); - drop(tx); - tx2.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn send1() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - rx.recv().unwrap(); - }); - assert_eq!(tx.send(1), Ok(())); - t.join().unwrap(); - } - - #[test] - fn send2() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); - t.join().unwrap(); - } - - #[test] - fn send3() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.send(1), Ok(())); - let t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); - t.join().unwrap(); - } - - #[test] - fn send4() { - let (tx, rx) = sync_channel::<i32>(0); - let tx2 = tx.clone(); - let (done, donerx) = channel(); - let done2 = done.clone(); - let t = thread::spawn(move || { - assert!(tx.send(1).is_err()); - done.send(()).unwrap(); - }); - let t2 = thread::spawn(move || { - assert!(tx2.send(2).is_err()); - done2.send(()).unwrap(); - }); - drop(rx); - donerx.recv().unwrap(); - donerx.recv().unwrap(); - t.join().unwrap(); - t2.join().unwrap(); - } - - #[test] - fn try_send1() { - let (tx, _rx) = sync_channel::<i32>(0); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } - - #[test] - fn try_send2() { - let (tx, _rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } - - #[test] - fn try_send3() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(1), Ok(())); - drop(rx); - assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); - } - - #[test] - fn issue_15761() { - fn repro() { - let (tx1, rx1) = sync_channel::<()>(3); - let (tx2, rx2) = sync_channel::<()>(3); - - let _t = thread::spawn(move || { - rx1.recv().unwrap(); - tx2.try_send(()).unwrap(); - }); - - tx1.try_send(()).unwrap(); - rx2.recv().unwrap(); - } - - for _ in 0..100 { - repro() - } - } -} - -// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs -mod select_tests { - use super::*; - - use std::thread; - - #[test] - fn smoke() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - tx1.send(1).unwrap(); - select! { - foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); }, - _bar = rx2.recv() => { panic!() } - } - tx2.send(2).unwrap(); - select! { - _foo = rx1.recv() => { panic!() }, - bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) } - } - drop(tx1); - select! { - foo = rx1.recv() => { assert!(foo.is_err()); }, - _bar = rx2.recv() => { panic!() } - } - drop(tx2); - select! { - bar = rx2.recv() => { assert!(bar.is_err()); } - } - } - - #[test] - fn smoke2() { - let (_tx1, rx1) = channel::<i32>(); - let (_tx2, rx2) = channel::<i32>(); - let (_tx3, rx3) = channel::<i32>(); - let (_tx4, rx4) = channel::<i32>(); - let (tx5, rx5) = channel::<i32>(); - tx5.send(4).unwrap(); - select! { - _foo = rx1.recv() => { panic!("1") }, - _foo = rx2.recv() => { panic!("2") }, - _foo = rx3.recv() => { panic!("3") }, - _foo = rx4.recv() => { panic!("4") }, - foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); } - } - } - - #[test] - fn closed() { - let (_tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - drop(tx2); - - select! { - _a1 = rx1.recv() => { panic!() }, - a2 = rx2.recv() => { assert!(a2.is_err()); } - } - } - - #[test] - fn unblocks() { - let (tx1, rx1) = channel::<i32>(); - let (_tx2, rx2) = channel::<i32>(); - let (tx3, rx3) = channel::<i32>(); - - let t = thread::spawn(move || { - for _ in 0..20 { - thread::yield_now(); - } - tx1.send(1).unwrap(); - rx3.recv().unwrap(); - for _ in 0..20 { - thread::yield_now(); - } - }); - - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - _b = rx2.recv() => { panic!() } - } - tx3.send(1).unwrap(); - select! { - a = rx1.recv() => { assert!(a.is_err()) }, - _b = rx2.recv() => { panic!() } - } - t.join().unwrap(); - } - - #[test] - fn both_ready() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - let (tx3, rx3) = channel::<()>(); - - let t = thread::spawn(move || { - for _ in 0..20 { - thread::yield_now(); - } - tx1.send(1).unwrap(); - tx2.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } - } - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } - } - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); - tx3.send(()).unwrap(); - t.join().unwrap(); - } - - #[test] - fn stress() { - const AMT: i32 = 10000; - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - let (tx3, rx3) = channel::<()>(); - - let t = thread::spawn(move || { - for i in 0..AMT { - if i % 2 == 0 { - tx1.send(i).unwrap(); - } else { - tx2.send(i).unwrap(); - } - rx3.recv().unwrap(); - } - }); - - for i in 0..AMT { - select! { - i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, - i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } - } - tx3.send(()).unwrap(); - } - t.join().unwrap(); - } - - #[allow(unused_must_use)] - #[test] - fn cloning() { - let (tx1, rx1) = channel::<i32>(); - let (_tx2, rx2) = channel::<i32>(); - let (tx3, rx3) = channel::<()>(); - - let t = thread::spawn(move || { - rx3.recv().unwrap(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); - tx1.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - tx3.send(()).unwrap(); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()).unwrap(); - t.join().unwrap(); - } - - #[allow(unused_must_use)] - #[test] - fn cloning2() { - let (tx1, rx1) = channel::<i32>(); - let (_tx2, rx2) = channel::<i32>(); - let (tx3, rx3) = channel::<()>(); - - let t = thread::spawn(move || { - rx3.recv().unwrap(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); - tx1.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - tx3.send(()).unwrap(); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()).unwrap(); - t.join().unwrap(); - } - - #[test] - fn cloning3() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let t = thread::spawn(move || { - select! { - _ = rx1.recv() => panic!(), - _ = rx2.recv() => {} - } - tx3.send(()).unwrap(); - }); - - for _ in 0..1000 { - thread::yield_now(); - } - drop(tx1.clone()); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn preflight1() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } - } - - #[test] - fn preflight2() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } - } - - #[test] - fn preflight3() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } - } - - #[test] - fn preflight4() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn preflight5() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn preflight6() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn preflight7() { - let (tx, rx) = channel::<()>(); - drop(tx); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn preflight8() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - drop(tx); - rx.recv().unwrap(); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn preflight9() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - drop(tx); - rx.recv().unwrap(); - select! { - _ = rx.recv() => {} - } - } - - #[test] - fn oneshot_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - let t = thread::spawn(move || { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { - thread::yield_now() - } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn stream_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - tx1.send(()).unwrap(); - tx1.send(()).unwrap(); - rx1.recv().unwrap(); - rx1.recv().unwrap(); - let t = thread::spawn(move || { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { - thread::yield_now() - } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn shared_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - drop(tx1.clone()); - tx1.send(()).unwrap(); - rx1.recv().unwrap(); - let t = thread::spawn(move || { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { - thread::yield_now() - } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - t.join().unwrap(); - } - - #[test] - fn sync1() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(1).unwrap(); - select! { - n = rx.recv() => { assert_eq!(n.unwrap(), 1); } - } - } - - #[test] - fn sync2() { - let (tx, rx) = sync_channel::<i32>(0); - let t = thread::spawn(move || { - for _ in 0..100 { - thread::yield_now() - } - tx.send(1).unwrap(); - }); - select! { - n = rx.recv() => { assert_eq!(n.unwrap(), 1); } - } - t.join().unwrap(); - } - - #[test] - fn sync3() { - let (tx1, rx1) = sync_channel::<i32>(0); - let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel(); - let t = thread::spawn(move || { - tx1.send(1).unwrap(); - }); - let t2 = thread::spawn(move || { - tx2.send(2).unwrap(); - }); - select! { - n = rx1.recv() => { - let n = n.unwrap(); - assert_eq!(n, 1); - assert_eq!(rx2.recv().unwrap(), 2); - }, - n = rx2.recv() => { - let n = n.unwrap(); - assert_eq!(n, 2); - assert_eq!(rx1.recv().unwrap(), 1); - } - } - t.join().unwrap(); - t2.join().unwrap(); - } -} diff --git a/vendor/flume/tests/never.rs b/vendor/flume/tests/never.rs deleted file mode 100644 index 676903f..0000000 --- a/vendor/flume/tests/never.rs +++ /dev/null @@ -1,99 +0,0 @@ -// //! Tests for the never channel flavor. - -// #[macro_use] -// extern crate crossbeam_channel; -// extern crate rand; - -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{never, tick, unbounded}; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn smoke() { -// select! { -// recv(never::<i32>()) -> _ => panic!(), -// default => {} -// } -// } - -// #[test] -// fn optional() { -// let (s, r) = unbounded::<i32>(); -// s.send(1).unwrap(); -// s.send(2).unwrap(); - -// let mut r = Some(&r); -// select! { -// recv(r.unwrap_or(&never())) -> _ => {} -// default => panic!(), -// } - -// r = None; -// select! { -// recv(r.unwrap_or(&never())) -> _ => panic!(), -// default => {} -// } -// } - -// #[test] -// fn tick_n() { -// let mut r = tick(ms(100)); -// let mut step = 0; - -// loop { -// select! { -// recv(r) -> _ => step += 1, -// default(ms(500)) => break, -// } - -// if step == 10 { -// r = never(); -// } -// } - -// assert_eq!(step, 10); -// } - -// #[test] -// fn capacity() { -// let r = never::<i32>(); -// assert_eq!(r.capacity(), Some(0)); -// } - -// #[test] -// fn len_empty_full() { -// let r = never::<i32>(); -// assert_eq!(r.len(), 0); -// assert_eq!(r.is_empty(), true); -// assert_eq!(r.is_full(), true); -// } - -// #[test] -// fn try_recv() { -// let r = never::<i32>(); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(100)); -// assert!(r.try_recv().is_err()); -// } - -// #[test] -// fn recv_timeout() { -// let start = Instant::now(); -// let r = never::<i32>(); - -// assert!(r.recv_timeout(ms(100)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(100)); -// assert!(now - start <= ms(150)); - -// assert!(r.recv_timeout(ms(100)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(200)); -// assert!(now - start <= ms(250)); -// } diff --git a/vendor/flume/tests/ready.rs b/vendor/flume/tests/ready.rs deleted file mode 100644 index 2f42d9a..0000000 --- a/vendor/flume/tests/ready.rs +++ /dev/null @@ -1,837 +0,0 @@ -// //! Tests for channel readiness using the `Select` struct. - -// extern crate crossbeam_channel; -// extern crate crossbeam_utils; - -// use std::any::Any; -// use std::cell::Cell; -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{after, bounded, tick, unbounded}; -// use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError}; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn smoke1() { -// let (s1, r1) = unbounded::<usize>(); -// let (s2, r2) = unbounded::<usize>(); - -// s1.send(1).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// assert_eq!(sel.ready(), 0); -// assert_eq!(r1.try_recv(), Ok(1)); - -// s2.send(2).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// assert_eq!(sel.ready(), 1); -// assert_eq!(r2.try_recv(), Ok(2)); -// } - -// #[test] -// fn smoke2() { -// let (_s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (_s3, r3) = unbounded::<i32>(); -// let (_s4, r4) = unbounded::<i32>(); -// let (s5, r5) = unbounded::<i32>(); - -// s5.send(5).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// sel.recv(&r3); -// sel.recv(&r4); -// sel.recv(&r5); -// assert_eq!(sel.ready(), 4); -// assert_eq!(r5.try_recv(), Ok(5)); -// } - -// #[test] -// fn disconnected() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// drop(s1); -// thread::sleep(ms(500)); -// s2.send(5).unwrap(); -// }); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready_timeout(ms(1000)) { -// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } - -// r2.recv().unwrap(); -// }) -// .unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready_timeout(ms(1000)) { -// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// drop(s2); -// }); - -// let mut sel = Select::new(); -// sel.recv(&r2); -// match sel.ready_timeout(ms(1000)) { -// Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// assert!(sel.try_ready().is_err()); - -// drop(s1); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.try_ready() { -// Ok(0) => assert!(r1.try_recv().is_err()), -// _ => panic!(), -// } - -// s2.send(2).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r2); -// match sel.try_ready() { -// Ok(0) => assert_eq!(r2.try_recv(), Ok(2)), -// _ => panic!(), -// } - -// let mut sel = Select::new(); -// sel.recv(&r2); -// assert!(sel.try_ready().is_err()); - -// let mut sel = Select::new(); -// assert!(sel.try_ready().is_err()); -// } - -// #[test] -// fn timeout() { -// let (_s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(1500)); -// s2.send(2).unwrap(); -// }); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// assert!(sel.ready_timeout(ms(1000)).is_err()); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready_timeout(ms(1000)) { -// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), -// _ => panic!(), -// } -// }) -// .unwrap(); - -// scope(|scope| { -// let (s, r) = unbounded::<i32>(); - -// scope.spawn(move |_| { -// thread::sleep(ms(500)); -// drop(s); -// }); - -// let mut sel = Select::new(); -// assert!(sel.ready_timeout(ms(1000)).is_err()); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.try_ready() { -// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default_when_disconnected() { -// let (_, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.try_ready() { -// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } - -// let (_, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready_timeout(ms(1000)) { -// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } - -// let (s, _) = bounded::<i32>(0); - -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.try_ready() { -// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), -// _ => panic!(), -// } - -// let (s, _) = bounded::<i32>(0); - -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.ready_timeout(ms(1000)) { -// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), -// _ => panic!(), -// } -// } - -// #[test] -// fn default_only() { -// let start = Instant::now(); - -// let mut sel = Select::new(); -// assert!(sel.try_ready().is_err()); -// let now = Instant::now(); -// assert!(now - start <= ms(50)); - -// let start = Instant::now(); -// let mut sel = Select::new(); -// assert!(sel.ready_timeout(ms(500)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(450)); -// assert!(now - start <= ms(550)); -// } - -// #[test] -// fn unblocks() { -// let (s1, r1) = bounded::<i32>(0); -// let (s2, r2) = bounded::<i32>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s2.send(2).unwrap(); -// }); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready_timeout(ms(1000)) { -// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), -// _ => panic!(), -// } -// }) -// .unwrap(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// assert_eq!(r1.recv().unwrap(), 1); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s1); -// let oper2 = sel.send(&s2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => oper.send(&s1, 1).unwrap(), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn both_ready() { -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s1.send(1).unwrap(); -// assert_eq!(r2.recv().unwrap(), 2); -// }); - -// for _ in 0..2 { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.send(&s2); -// match sel.ready() { -// 0 => assert_eq!(r1.try_recv(), Ok(1)), -// 1 => s2.try_send(2).unwrap(), -// _ => panic!(), -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn cloning1() { -// scope(|scope| { -// let (s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (s3, r3) = unbounded::<()>(); - -// scope.spawn(move |_| { -// r3.recv().unwrap(); -// drop(s1.clone()); -// assert!(r3.try_recv().is_err()); -// s1.send(1).unwrap(); -// r3.recv().unwrap(); -// }); - -// s3.send(()).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready() { -// 0 => drop(r1.try_recv()), -// 1 => drop(r2.try_recv()), -// _ => panic!(), -// } - -// s3.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn cloning2() { -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = unbounded::<()>(); -// let (_s3, _r3) = unbounded::<()>(); - -// scope(|scope| { -// scope.spawn(move |_| { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready() { -// 0 => panic!(), -// 1 => drop(r2.try_recv()), -// _ => panic!(), -// } -// }); - -// thread::sleep(ms(500)); -// drop(s1.clone()); -// s2.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn preflight1() { -// let (s, r) = unbounded(); -// s.send(()).unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => drop(r.try_recv()), -// _ => panic!(), -// } -// } - -// #[test] -// fn preflight2() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => assert_eq!(r.try_recv(), Ok(())), -// _ => panic!(), -// } - -// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); -// } - -// #[test] -// fn preflight3() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); -// r.recv().unwrap(); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), -// _ => panic!(), -// } -// } - -// #[test] -// fn duplicate_operations() { -// let (s, r) = unbounded::<i32>(); -// let hit = vec![Cell::new(false); 4]; - -// while hit.iter().map(|h| h.get()).any(|hit| !hit) { -// let mut sel = Select::new(); -// sel.recv(&r); -// sel.recv(&r); -// sel.send(&s); -// sel.send(&s); -// match sel.ready() { -// 0 => { -// assert!(r.try_recv().is_ok()); -// hit[0].set(true); -// } -// 1 => { -// assert!(r.try_recv().is_ok()); -// hit[1].set(true); -// } -// 2 => { -// assert!(s.try_send(0).is_ok()); -// hit[2].set(true); -// } -// 3 => { -// assert!(s.try_send(0).is_ok()); -// hit[3].set(true); -// } -// _ => panic!(), -// } -// } -// } - -// #[test] -// fn nesting() { -// let (s, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.ready() { -// 0 => { -// assert!(s.try_send(0).is_ok()); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => { -// assert_eq!(r.try_recv(), Ok(0)); - -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.ready() { -// 0 => { -// assert!(s.try_send(1).is_ok()); - -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => { -// assert_eq!(r.try_recv(), Ok(1)); -// } -// _ => panic!(), -// } -// } -// _ => panic!(), -// } -// } -// _ => panic!(), -// } -// } -// _ => panic!(), -// } -// } - -// #[test] -// fn stress_recv() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded(); -// let (s2, r2) = bounded(5); -// let (s3, r3) = bounded(0); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// r3.recv().unwrap(); - -// s2.send(i).unwrap(); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// match sel.ready() { -// 0 => assert_eq!(r1.try_recv(), Ok(i)), -// 1 => assert_eq!(r2.try_recv(), Ok(i)), -// _ => panic!(), -// } - -// s3.send(()).unwrap(); -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_send() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// assert_eq!(r1.recv().unwrap(), i); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// sel.send(&s1); -// sel.send(&s2); -// match sel.ready() { -// 0 => assert!(s1.try_send(i).is_ok()), -// 1 => assert!(s2.try_send(i).is_ok()), -// _ => panic!(), -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_mixed() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.send(&s2); -// match sel.ready() { -// 0 => assert_eq!(r1.try_recv(), Ok(i)), -// 1 => assert!(s2.try_send(i).is_ok()), -// _ => panic!(), -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_timeout_two_threads() { -// const COUNT: usize = 20; - -// let (s, r) = bounded(2); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// let done = false; -// while !done { -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.ready_timeout(ms(100)) { -// Err(_) => {} -// Ok(0) => { -// assert!(s.try_send(i).is_ok()); -// break; -// } -// Ok(_) => panic!(), -// } -// } -// } -// }); - -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// let mut done = false; -// while !done { -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready_timeout(ms(100)) { -// Err(_) => {} -// Ok(0) => { -// assert_eq!(r.try_recv(), Ok(i)); -// done = true; -// } -// Ok(_) => panic!(), -// } -// } -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn send_recv_same_channel() { -// let (s, r) = bounded::<i32>(0); -// let mut sel = Select::new(); -// sel.send(&s); -// sel.recv(&r); -// assert!(sel.ready_timeout(ms(100)).is_err()); - -// let (s, r) = unbounded::<i32>(); -// let mut sel = Select::new(); -// sel.send(&s); -// sel.recv(&r); -// match sel.ready_timeout(ms(100)) { -// Err(_) => panic!(), -// Ok(0) => assert!(s.try_send(0).is_ok()), -// Ok(_) => panic!(), -// } -// } - -// #[test] -// fn channel_through_channel() { -// const COUNT: usize = 1000; - -// type T = Box<dyn Any + Send>; - -// for cap in 1..4 { -// let (s, r) = bounded::<T>(cap); - -// scope(|scope| { -// scope.spawn(move |_| { -// let mut s = s; - -// for _ in 0..COUNT { -// let (new_s, new_r) = bounded(cap); -// let new_r: T = Box::new(Some(new_r)); - -// { -// let mut sel = Select::new(); -// sel.send(&s); -// match sel.ready() { -// 0 => assert!(s.try_send(new_r).is_ok()), -// _ => panic!(), -// } -// } - -// s = new_s; -// } -// }); - -// scope.spawn(move |_| { -// let mut r = r; - -// for _ in 0..COUNT { -// let new = { -// let mut sel = Select::new(); -// sel.recv(&r); -// match sel.ready() { -// 0 => r -// .try_recv() -// .unwrap() -// .downcast_mut::<Option<Receiver<T>>>() -// .unwrap() -// .take() -// .unwrap(), -// _ => panic!(), -// } -// }; -// r = new; -// } -// }); -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn fairness1() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(COUNT); -// let (s2, r2) = unbounded::<()>(); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let hits = vec![Cell::new(0usize); 4]; -// for _ in 0..COUNT { -// let after = after(ms(0)); -// let tick = tick(ms(0)); - -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// sel.recv(&after); -// sel.recv(&tick); -// match sel.ready() { -// 0 => { -// r1.try_recv().unwrap(); -// hits[0].set(hits[0].get() + 1); -// } -// 1 => { -// r2.try_recv().unwrap(); -// hits[1].set(hits[1].get() + 1); -// } -// 2 => { -// after.try_recv().unwrap(); -// hits[2].set(hits[2].get() + 1); -// } -// 3 => { -// tick.try_recv().unwrap(); -// hits[3].set(hits[3].get() + 1); -// } -// _ => panic!(), -// } -// } -// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn fairness2() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = bounded::<()>(1); -// let (s3, r3) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// let mut sel = Select::new(); -// let mut oper1 = None; -// let mut oper2 = None; -// if s1.is_empty() { -// oper1 = Some(sel.send(&s1)); -// } -// if s2.is_empty() { -// oper2 = Some(sel.send(&s2)); -// } -// let oper3 = sel.send(&s3); -// let oper = sel.select(); -// match oper.index() { -// i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), -// i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), -// i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), -// _ => unreachable!(), -// } -// } -// }); - -// let hits = vec![Cell::new(0usize); 3]; -// for _ in 0..COUNT { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// sel.recv(&r3); -// loop { -// match sel.ready() { -// 0 => { -// if r1.try_recv().is_ok() { -// hits[0].set(hits[0].get() + 1); -// break; -// } -// } -// 1 => { -// if r2.try_recv().is_ok() { -// hits[1].set(hits[1].get() + 1); -// break; -// } -// } -// 2 => { -// if r3.try_recv().is_ok() { -// hits[2].set(hits[2].get() + 1); -// break; -// } -// } -// _ => unreachable!(), -// } -// } -// } -// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 10)); -// }) -// .unwrap(); -// } diff --git a/vendor/flume/tests/same_channel.rs b/vendor/flume/tests/same_channel.rs deleted file mode 100644 index c6452ec..0000000 --- a/vendor/flume/tests/same_channel.rs +++ /dev/null @@ -1,114 +0,0 @@ -// extern crate crossbeam_channel; - -// use std::time::Duration; - -// use crossbeam_channel::{after, bounded, never, tick, unbounded}; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn after_same_channel() { -// let r = after(ms(50)); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// let r3 = after(ms(50)); -// assert!(!r.same_channel(&r3)); -// assert!(!r2.same_channel(&r3)); - -// let r4 = after(ms(100)); -// assert!(!r.same_channel(&r4)); -// assert!(!r2.same_channel(&r4)); -// } - -// #[test] -// fn array_same_channel() { -// let (s, r) = bounded::<usize>(1); - -// let s2 = s.clone(); -// assert!(s.same_channel(&s2)); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// let (s3, r3) = bounded::<usize>(1); -// assert!(!s.same_channel(&s3)); -// assert!(!s2.same_channel(&s3)); -// assert!(!r.same_channel(&r3)); -// assert!(!r2.same_channel(&r3)); -// } - -// #[test] -// fn list_same_channel() { -// let (s, r) = unbounded::<usize>(); - -// let s2 = s.clone(); -// assert!(s.same_channel(&s2)); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// let (s3, r3) = unbounded::<usize>(); -// assert!(!s.same_channel(&s3)); -// assert!(!s2.same_channel(&s3)); -// assert!(!r.same_channel(&r3)); -// assert!(!r2.same_channel(&r3)); -// } - -// #[test] -// fn never_same_channel() { -// let r = never::<usize>(); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// // Never channel are always equal to one another. -// let r3 = never::<usize>(); -// assert!(r.same_channel(&r3)); -// assert!(r2.same_channel(&r3)); -// } - -// #[test] -// fn tick_same_channel() { -// let r = tick(ms(50)); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// let r3 = tick(ms(50)); -// assert!(!r.same_channel(&r3)); -// assert!(!r2.same_channel(&r3)); - -// let r4 = tick(ms(100)); -// assert!(!r.same_channel(&r4)); -// assert!(!r2.same_channel(&r4)); -// } - -// #[test] -// fn zero_same_channel() { -// let (s, r) = bounded::<usize>(0); - -// let s2 = s.clone(); -// assert!(s.same_channel(&s2)); - -// let r2 = r.clone(); -// assert!(r.same_channel(&r2)); - -// let (s3, r3) = bounded::<usize>(0); -// assert!(!s.same_channel(&s3)); -// assert!(!s2.same_channel(&s3)); -// assert!(!r.same_channel(&r3)); -// assert!(!r2.same_channel(&r3)); -// } - -// #[test] -// fn different_flavors_same_channel() { -// let (s1, r1) = bounded::<usize>(0); -// let (s2, r2) = unbounded::<usize>(); - -// assert!(!s1.same_channel(&s2)); -// assert!(!r1.same_channel(&r2)); -// } diff --git a/vendor/flume/tests/select.rs b/vendor/flume/tests/select.rs deleted file mode 100644 index 4fac9e9..0000000 --- a/vendor/flume/tests/select.rs +++ /dev/null @@ -1,1304 +0,0 @@ -// //! Tests for channel selection using the `Select` struct. - -// extern crate crossbeam_channel; -// extern crate crossbeam_utils; - -// use std::any::Any; -// use std::cell::Cell; -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError}; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn smoke1() { -// let (s1, r1) = unbounded::<usize>(); -// let (s2, r2) = unbounded::<usize>(); - -// s1.send(1).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// } - -// s2.send(2).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), -// _ => unreachable!(), -// } -// } - -// #[test] -// fn smoke2() { -// let (_s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (_s3, r3) = unbounded::<i32>(); -// let (_s4, r4) = unbounded::<i32>(); -// let (s5, r5) = unbounded::<i32>(); - -// s5.send(5).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper3 = sel.recv(&r3); -// let oper4 = sel.recv(&r4); -// let oper5 = sel.recv(&r5); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => panic!(), -// i if i == oper3 => panic!(), -// i if i == oper4 => panic!(), -// i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)), -// _ => unreachable!(), -// } -// } - -// #[test] -// fn disconnected() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// drop(s1); -// thread::sleep(ms(500)); -// s2.send(5).unwrap(); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r1).is_err()), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } - -// r2.recv().unwrap(); -// }) -// .unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r1).is_err()), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// drop(s2); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r2).is_err()), -// _ => unreachable!(), -// }, -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// let _oper1 = sel.recv(&r1); -// let _oper2 = sel.recv(&r2); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(_) => panic!(), -// } - -// drop(s1); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.try_select(); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r1).is_err()), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } - -// s2.send(2).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r2); -// let oper = sel.try_select(); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)), -// _ => unreachable!(), -// }, -// } - -// let mut sel = Select::new(); -// let _oper1 = sel.recv(&r2); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(_) => panic!(), -// } - -// let mut sel = Select::new(); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(_) => panic!(), -// } -// } - -// #[test] -// fn timeout() { -// let (_s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(1500)); -// s2.send(2).unwrap(); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), -// _ => unreachable!(), -// }, -// } -// }) -// .unwrap(); - -// scope(|scope| { -// let (s, r) = unbounded::<i32>(); - -// scope.spawn(move |_| { -// thread::sleep(ms(500)); -// drop(s); -// }); - -// let mut sel = Select::new(); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.try_select(); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r).is_err()), -// _ => unreachable!(), -// }, -// } -// } -// Ok(_) => unreachable!(), -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default_when_disconnected() { -// let (_, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.try_select(); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r).is_err()), -// _ => unreachable!(), -// }, -// } - -// let (_, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r).is_err()), -// _ => unreachable!(), -// }, -// } - -// let (s, _) = bounded::<i32>(0); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.try_select(); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.send(&s, 0).is_err()), -// _ => unreachable!(), -// }, -// } - -// let (s, _) = bounded::<i32>(0); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert!(oper.send(&s, 0).is_err()), -// _ => unreachable!(), -// }, -// } -// } - -// #[test] -// fn default_only() { -// let start = Instant::now(); - -// let mut sel = Select::new(); -// let oper = sel.try_select(); -// assert!(oper.is_err()); -// let now = Instant::now(); -// assert!(now - start <= ms(50)); - -// let start = Instant::now(); -// let mut sel = Select::new(); -// let oper = sel.select_timeout(ms(500)); -// assert!(oper.is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(450)); -// assert!(now - start <= ms(550)); -// } - -// #[test] -// fn unblocks() { -// let (s1, r1) = bounded::<i32>(0); -// let (s2, r2) = bounded::<i32>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s2.send(2).unwrap(); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), -// _ => unreachable!(), -// }, -// } -// }) -// .unwrap(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// assert_eq!(r1.recv().unwrap(), 1); -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s1); -// let oper2 = sel.send(&s2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// i if i == oper1 => oper.send(&s1, 1).unwrap(), -// i if i == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn both_ready() { -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s1.send(1).unwrap(); -// assert_eq!(r2.recv().unwrap(), 2); -// }); - -// for _ in 0..2 { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.send(&s2); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), -// i if i == oper2 => oper.send(&s2, 2).unwrap(), -// _ => unreachable!(), -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn loop_try() { -// const RUNS: usize = 20; - -// for _ in 0..RUNS { -// let (s1, r1) = bounded::<i32>(0); -// let (s2, r2) = bounded::<i32>(0); -// let (s_end, r_end) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| loop { -// let mut done = false; - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s1); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// i if i == oper1 => { -// let _ = oper.send(&s1, 1); -// done = true; -// } -// _ => unreachable!(), -// }, -// } -// if done { -// break; -// } - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r_end); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// i if i == oper1 => { -// let _ = oper.recv(&r_end); -// done = true; -// } -// _ => unreachable!(), -// }, -// } -// if done { -// break; -// } -// }); - -// scope.spawn(|_| loop { -// if let Ok(x) = r2.try_recv() { -// assert_eq!(x, 2); -// break; -// } - -// let mut done = false; -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r_end); -// let oper = sel.try_select(); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// i if i == oper1 => { -// let _ = oper.recv(&r_end); -// done = true; -// } -// _ => unreachable!(), -// }, -// } -// if done { -// break; -// } -// }); - -// scope.spawn(|_| { -// thread::sleep(ms(500)); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.send(&s2); -// let oper = sel.select_timeout(ms(1000)); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), -// i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()), -// _ => unreachable!(), -// }, -// } - -// drop(s_end); -// }); -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn cloning1() { -// scope(|scope| { -// let (s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (s3, r3) = unbounded::<()>(); - -// scope.spawn(move |_| { -// r3.recv().unwrap(); -// drop(s1.clone()); -// assert!(r3.try_recv().is_err()); -// s1.send(1).unwrap(); -// r3.recv().unwrap(); -// }); - -// s3.send(()).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => drop(oper.recv(&r1)), -// i if i == oper2 => drop(oper.recv(&r2)), -// _ => unreachable!(), -// } - -// s3.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn cloning2() { -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = unbounded::<()>(); -// let (_s3, _r3) = unbounded::<()>(); - -// scope(|scope| { -// scope.spawn(move |_| { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => panic!(), -// i if i == oper2 => drop(oper.recv(&r2)), -// _ => unreachable!(), -// } -// }); - -// thread::sleep(ms(500)); -// drop(s1.clone()); -// s2.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn preflight1() { -// let (s, r) = unbounded(); -// s.send(()).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => drop(oper.recv(&r)), -// _ => unreachable!(), -// } -// } - -// #[test] -// fn preflight2() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())), -// _ => unreachable!(), -// } - -// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); -// } - -// #[test] -// fn preflight3() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); -// r.recv().unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => assert!(oper.recv(&r).is_err()), -// _ => unreachable!(), -// } -// } - -// #[test] -// fn duplicate_operations() { -// let (s, r) = unbounded::<i32>(); -// let hit = vec![Cell::new(false); 4]; - -// while hit.iter().map(|h| h.get()).any(|hit| !hit) { -// let mut sel = Select::new(); -// let oper0 = sel.recv(&r); -// let oper1 = sel.recv(&r); -// let oper2 = sel.send(&s); -// let oper3 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper0 => { -// assert!(oper.recv(&r).is_ok()); -// hit[0].set(true); -// } -// i if i == oper1 => { -// assert!(oper.recv(&r).is_ok()); -// hit[1].set(true); -// } -// i if i == oper2 => { -// assert!(oper.send(&s, 0).is_ok()); -// hit[2].set(true); -// } -// i if i == oper3 => { -// assert!(oper.send(&s, 0).is_ok()); -// hit[3].set(true); -// } -// _ => unreachable!(), -// } -// } -// } - -// #[test] -// fn nesting() { -// let (s, r) = unbounded::<i32>(); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// assert!(oper.send(&s, 0).is_ok()); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// assert_eq!(oper.recv(&r), Ok(0)); - -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// assert!(oper.send(&s, 1).is_ok()); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// assert_eq!(oper.recv(&r), Ok(1)); -// } -// _ => unreachable!(), -// } -// } -// _ => unreachable!(), -// } -// } -// _ => unreachable!(), -// } -// } -// _ => unreachable!(), -// } -// } - -// #[test] -// fn stress_recv() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded(); -// let (s2, r2) = bounded(5); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// r3.recv().unwrap(); - -// s2.send(i).unwrap(); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), -// ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)), -// _ => unreachable!(), -// } - -// s3.send(()).unwrap(); -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_send() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// assert_eq!(r1.recv().unwrap(), i); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// let oper1 = sel.send(&s1); -// let oper2 = sel.send(&s2); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()), -// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), -// _ => unreachable!(), -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_mixed() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.send(&s2); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), -// _ => unreachable!(), -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_timeout_two_threads() { -// const COUNT: usize = 20; - -// let (s, r) = bounded(2); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// let done = false; -// while !done { -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.select_timeout(ms(100)); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => { -// assert!(oper.send(&s, i).is_ok()); -// break; -// } -// _ => unreachable!(), -// }, -// } -// } -// } -// }); - -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// let mut done = false; -// while !done { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select_timeout(ms(100)); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => { -// assert_eq!(oper.recv(&r), Ok(i)); -// done = true; -// } -// _ => unreachable!(), -// }, -// } -// } -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn send_recv_same_channel() { -// let (s, r) = bounded::<i32>(0); -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper2 = sel.recv(&r); -// let oper = sel.select_timeout(ms(100)); -// match oper { -// Err(_) => {} -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => panic!(), -// ix if ix == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } - -// let (s, r) = unbounded::<i32>(); -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper2 = sel.recv(&r); -// let oper = sel.select_timeout(ms(100)); -// match oper { -// Err(_) => panic!(), -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()), -// ix if ix == oper2 => panic!(), -// _ => unreachable!(), -// }, -// } -// } - -// #[test] -// fn matching() { -// const THREADS: usize = 44; - -// let (s, r) = &bounded::<usize>(0); - -// scope(|scope| { -// for i in 0..THREADS { -// scope.spawn(move |_| { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper2 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), -// _ => unreachable!(), -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn matching_with_leftover() { -// const THREADS: usize = 55; - -// let (s, r) = &bounded::<usize>(0); - -// scope(|scope| { -// for i in 0..THREADS { -// scope.spawn(move |_| { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper2 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), -// _ => unreachable!(), -// } -// }); -// } -// s.send(!0).unwrap(); -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn channel_through_channel() { -// const COUNT: usize = 1000; - -// type T = Box<dyn Any + Send>; - -// for cap in 0..3 { -// let (s, r) = bounded::<T>(cap); - -// scope(|scope| { -// scope.spawn(move |_| { -// let mut s = s; - -// for _ in 0..COUNT { -// let (new_s, new_r) = bounded(cap); -// let new_r: T = Box::new(Some(new_r)); - -// { -// let mut sel = Select::new(); -// let oper1 = sel.send(&s); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()), -// _ => unreachable!(), -// } -// } - -// s = new_s; -// } -// }); - -// scope.spawn(move |_| { -// let mut r = r; - -// for _ in 0..COUNT { -// let new = { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => oper -// .recv(&r) -// .unwrap() -// .downcast_mut::<Option<Receiver<T>>>() -// .unwrap() -// .take() -// .unwrap(), -// _ => unreachable!(), -// } -// }; -// r = new; -// } -// }); -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn linearizable_try() { -// const COUNT: usize = 100_000; - -// for step in 0..2 { -// let (start_s, start_r) = bounded::<()>(0); -// let (end_s, end_r) = bounded::<()>(0); - -// let ((s1, r1), (s2, r2)) = if step == 0 { -// (bounded::<i32>(1), bounded::<i32>(1)) -// } else { -// (unbounded::<i32>(), unbounded::<i32>()) -// }; - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// start_s.send(()).unwrap(); - -// s1.send(1).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.try_select(); -// match oper { -// Err(_) => unreachable!(), -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), -// ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), -// _ => unreachable!(), -// }, -// } - -// end_s.send(()).unwrap(); -// let _ = r2.try_recv(); -// } -// }); - -// for _ in 0..COUNT { -// start_r.recv().unwrap(); - -// s2.send(1).unwrap(); -// let _ = r1.try_recv(); - -// end_r.recv().unwrap(); -// } -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn linearizable_timeout() { -// const COUNT: usize = 100_000; - -// for step in 0..2 { -// let (start_s, start_r) = bounded::<()>(0); -// let (end_s, end_r) = bounded::<()>(0); - -// let ((s1, r1), (s2, r2)) = if step == 0 { -// (bounded::<i32>(1), bounded::<i32>(1)) -// } else { -// (unbounded::<i32>(), unbounded::<i32>()) -// }; - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// start_s.send(()).unwrap(); - -// s1.send(1).unwrap(); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper = sel.select_timeout(ms(0)); -// match oper { -// Err(_) => unreachable!(), -// Ok(oper) => match oper.index() { -// ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), -// ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), -// _ => unreachable!(), -// }, -// } - -// end_s.send(()).unwrap(); -// let _ = r2.try_recv(); -// } -// }); - -// for _ in 0..COUNT { -// start_r.recv().unwrap(); - -// s2.send(1).unwrap(); -// let _ = r1.try_recv(); - -// end_r.recv().unwrap(); -// } -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn fairness1() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(COUNT); -// let (s2, r2) = unbounded::<()>(); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let hits = vec![Cell::new(0usize); 4]; -// for _ in 0..COUNT { -// let after = after(ms(0)); -// let tick = tick(ms(0)); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper3 = sel.recv(&after); -// let oper4 = sel.recv(&tick); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// oper.recv(&r1).unwrap(); -// hits[0].set(hits[0].get() + 1); -// } -// i if i == oper2 => { -// oper.recv(&r2).unwrap(); -// hits[1].set(hits[1].get() + 1); -// } -// i if i == oper3 => { -// oper.recv(&after).unwrap(); -// hits[2].set(hits[2].get() + 1); -// } -// i if i == oper4 => { -// oper.recv(&tick).unwrap(); -// hits[3].set(hits[3].get() + 1); -// } -// _ => unreachable!(), -// } -// } -// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn fairness2() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = bounded::<()>(1); -// let (s3, r3) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// let mut sel = Select::new(); -// let mut oper1 = None; -// let mut oper2 = None; -// if s1.is_empty() { -// oper1 = Some(sel.send(&s1)); -// } -// if s2.is_empty() { -// oper2 = Some(sel.send(&s2)); -// } -// let oper3 = sel.send(&s3); -// let oper = sel.select(); -// match oper.index() { -// i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), -// i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), -// i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), -// _ => unreachable!(), -// } -// } -// }); - -// let hits = vec![Cell::new(0usize); 3]; -// for _ in 0..COUNT { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper3 = sel.recv(&r3); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// oper.recv(&r1).unwrap(); -// hits[0].set(hits[0].get() + 1); -// } -// i if i == oper2 => { -// oper.recv(&r2).unwrap(); -// hits[1].set(hits[1].get() + 1); -// } -// i if i == oper3 => { -// oper.recv(&r3).unwrap(); -// hits[2].set(hits[2].get() + 1); -// } -// _ => unreachable!(), -// } -// } -// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); -// }) -// .unwrap(); -// } - -// #[test] -// fn sync_and_clone() { -// const THREADS: usize = 20; - -// let (s, r) = &bounded::<usize>(0); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper2 = sel.send(&s); -// let sel = &sel; - -// scope(|scope| { -// for i in 0..THREADS { -// scope.spawn(move |_| { -// let mut sel = sel.clone(); -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), -// _ => unreachable!(), -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn send_and_clone() { -// const THREADS: usize = 20; - -// let (s, r) = &bounded::<usize>(0); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r); -// let oper2 = sel.send(&s); - -// scope(|scope| { -// for i in 0..THREADS { -// let mut sel = sel.clone(); -// scope.spawn(move |_| { -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), -// _ => unreachable!(), -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn reuse() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.send(&s2); - -// for i in 0..COUNT { -// for _ in 0..2 { -// let oper = sel.select(); -// match oper.index() { -// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), -// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), -// _ => unreachable!(), -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } diff --git a/vendor/flume/tests/select_macro.rs b/vendor/flume/tests/select_macro.rs deleted file mode 100644 index 367d8dd..0000000 --- a/vendor/flume/tests/select_macro.rs +++ /dev/null @@ -1,1440 +0,0 @@ -// //! Tests for the `select!` macro. - -// #![deny(unsafe_code)] - -// #[macro_use] -// extern crate crossbeam_channel; -// extern crate crossbeam_utils; - -// use std::any::Any; -// use std::cell::Cell; -// use std::ops::Deref; -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{after, bounded, never, tick, unbounded}; -// use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn smoke1() { -// let (s1, r1) = unbounded::<usize>(); -// let (s2, r2) = unbounded::<usize>(); - -// s1.send(1).unwrap(); - -// select! { -// recv(r1) -> v => assert_eq!(v, Ok(1)), -// recv(r2) -> _ => panic!(), -// } - -// s2.send(2).unwrap(); - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> v => assert_eq!(v, Ok(2)), -// } -// } - -// #[test] -// fn smoke2() { -// let (_s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (_s3, r3) = unbounded::<i32>(); -// let (_s4, r4) = unbounded::<i32>(); -// let (s5, r5) = unbounded::<i32>(); - -// s5.send(5).unwrap(); - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> _ => panic!(), -// recv(r3) -> _ => panic!(), -// recv(r4) -> _ => panic!(), -// recv(r5) -> v => assert_eq!(v, Ok(5)), -// } -// } - -// #[test] -// fn disconnected() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// drop(s1); -// thread::sleep(ms(500)); -// s2.send(5).unwrap(); -// }); - -// select! { -// recv(r1) -> v => assert!(v.is_err()), -// recv(r2) -> _ => panic!(), -// default(ms(1000)) => panic!(), -// } - -// r2.recv().unwrap(); -// }) -// .unwrap(); - -// select! { -// recv(r1) -> v => assert!(v.is_err()), -// recv(r2) -> _ => panic!(), -// default(ms(1000)) => panic!(), -// } - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// drop(s2); -// }); - -// select! { -// recv(r2) -> v => assert!(v.is_err()), -// default(ms(1000)) => panic!(), -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default() { -// let (s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> _ => panic!(), -// default => {} -// } - -// drop(s1); - -// select! { -// recv(r1) -> v => assert!(v.is_err()), -// recv(r2) -> _ => panic!(), -// default => panic!(), -// } - -// s2.send(2).unwrap(); - -// select! { -// recv(r2) -> v => assert_eq!(v, Ok(2)), -// default => panic!(), -// } - -// select! { -// recv(r2) -> _ => panic!(), -// default => {}, -// } - -// select! { -// default => {}, -// } -// } - -// #[test] -// fn timeout() { -// let (_s1, r1) = unbounded::<i32>(); -// let (s2, r2) = unbounded::<i32>(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(1500)); -// s2.send(2).unwrap(); -// }); - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> _ => panic!(), -// default(ms(1000)) => {}, -// } - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> v => assert_eq!(v, Ok(2)), -// default(ms(1000)) => panic!(), -// } -// }) -// .unwrap(); - -// scope(|scope| { -// let (s, r) = unbounded::<i32>(); - -// scope.spawn(move |_| { -// thread::sleep(ms(500)); -// drop(s); -// }); - -// select! { -// default(ms(1000)) => { -// select! { -// recv(r) -> v => assert!(v.is_err()), -// default => panic!(), -// } -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn default_when_disconnected() { -// let (_, r) = unbounded::<i32>(); - -// select! { -// recv(r) -> res => assert!(res.is_err()), -// default => panic!(), -// } - -// let (_, r) = unbounded::<i32>(); - -// select! { -// recv(r) -> res => assert!(res.is_err()), -// default(ms(1000)) => panic!(), -// } - -// let (s, _) = bounded::<i32>(0); - -// select! { -// send(s, 0) -> res => assert!(res.is_err()), -// default => panic!(), -// } - -// let (s, _) = bounded::<i32>(0); - -// select! { -// send(s, 0) -> res => assert!(res.is_err()), -// default(ms(1000)) => panic!(), -// } -// } - -// #[test] -// fn default_only() { -// let start = Instant::now(); -// select! { -// default => {} -// } -// let now = Instant::now(); -// assert!(now - start <= ms(50)); - -// let start = Instant::now(); -// select! { -// default(ms(500)) => {} -// } -// let now = Instant::now(); -// assert!(now - start >= ms(450)); -// assert!(now - start <= ms(550)); -// } - -// #[test] -// fn unblocks() { -// let (s1, r1) = bounded::<i32>(0); -// let (s2, r2) = bounded::<i32>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s2.send(2).unwrap(); -// }); - -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> v => assert_eq!(v, Ok(2)), -// default(ms(1000)) => panic!(), -// } -// }) -// .unwrap(); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// assert_eq!(r1.recv().unwrap(), 1); -// }); - -// select! { -// send(s1, 1) -> _ => {}, -// send(s2, 2) -> _ => panic!(), -// default(ms(1000)) => panic!(), -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn both_ready() { -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s1.send(1).unwrap(); -// assert_eq!(r2.recv().unwrap(), 2); -// }); - -// for _ in 0..2 { -// select! { -// recv(r1) -> v => assert_eq!(v, Ok(1)), -// send(s2, 2) -> _ => {}, -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn loop_try() { -// const RUNS: usize = 20; - -// for _ in 0..RUNS { -// let (s1, r1) = bounded::<i32>(0); -// let (s2, r2) = bounded::<i32>(0); -// let (s_end, r_end) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| loop { -// select! { -// send(s1, 1) -> _ => break, -// default => {} -// } - -// select! { -// recv(r_end) -> _ => break, -// default => {} -// } -// }); - -// scope.spawn(|_| loop { -// if let Ok(x) = r2.try_recv() { -// assert_eq!(x, 2); -// break; -// } - -// select! { -// recv(r_end) -> _ => break, -// default => {} -// } -// }); - -// scope.spawn(|_| { -// thread::sleep(ms(500)); - -// select! { -// recv(r1) -> v => assert_eq!(v, Ok(1)), -// send(s2, 2) -> _ => {}, -// default(ms(500)) => panic!(), -// } - -// drop(s_end); -// }); -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn cloning1() { -// scope(|scope| { -// let (s1, r1) = unbounded::<i32>(); -// let (_s2, r2) = unbounded::<i32>(); -// let (s3, r3) = unbounded::<()>(); - -// scope.spawn(move |_| { -// r3.recv().unwrap(); -// drop(s1.clone()); -// assert_eq!(r3.try_recv(), Err(TryRecvError::Empty)); -// s1.send(1).unwrap(); -// r3.recv().unwrap(); -// }); - -// s3.send(()).unwrap(); - -// select! { -// recv(r1) -> _ => {}, -// recv(r2) -> _ => {}, -// } - -// s3.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn cloning2() { -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = unbounded::<()>(); -// let (_s3, _r3) = unbounded::<()>(); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// recv(r1) -> _ => panic!(), -// recv(r2) -> _ => {}, -// } -// }); - -// thread::sleep(ms(500)); -// drop(s1.clone()); -// s2.send(()).unwrap(); -// }) -// .unwrap(); -// } - -// #[test] -// fn preflight1() { -// let (s, r) = unbounded(); -// s.send(()).unwrap(); - -// select! { -// recv(r) -> _ => {} -// } -// } - -// #[test] -// fn preflight2() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); - -// select! { -// recv(r) -> v => assert!(v.is_ok()), -// } -// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); -// } - -// #[test] -// fn preflight3() { -// let (s, r) = unbounded(); -// drop(s.clone()); -// s.send(()).unwrap(); -// drop(s); -// r.recv().unwrap(); - -// select! { -// recv(r) -> v => assert!(v.is_err()) -// } -// } - -// #[test] -// fn duplicate_operations() { -// let (s, r) = unbounded::<i32>(); -// let mut hit = [false; 4]; - -// while hit.iter().any(|hit| !hit) { -// select! { -// recv(r) -> _ => hit[0] = true, -// recv(r) -> _ => hit[1] = true, -// send(s, 0) -> _ => hit[2] = true, -// send(s, 0) -> _ => hit[3] = true, -// } -// } -// } - -// #[test] -// fn nesting() { -// let (s, r) = unbounded::<i32>(); - -// select! { -// send(s, 0) -> _ => { -// select! { -// recv(r) -> v => { -// assert_eq!(v, Ok(0)); -// select! { -// send(s, 1) -> _ => { -// select! { -// recv(r) -> v => { -// assert_eq!(v, Ok(1)); -// } -// } -// } -// } -// } -// } -// } -// } -// } - -// #[test] -// #[should_panic(expected = "send panicked")] -// fn panic_sender() { -// fn get() -> Sender<i32> { -// panic!("send panicked") -// } - -// #[allow(unreachable_code)] -// { -// select! { -// send(get(), panic!()) -> _ => {} -// } -// } -// } - -// #[test] -// #[should_panic(expected = "recv panicked")] -// fn panic_receiver() { -// fn get() -> Receiver<i32> { -// panic!("recv panicked") -// } - -// select! { -// recv(get()) -> _ => {} -// } -// } - -// #[test] -// fn stress_recv() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded(); -// let (s2, r2) = bounded(5); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// r3.recv().unwrap(); - -// s2.send(i).unwrap(); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// select! { -// recv(r1) -> v => assert_eq!(v, Ok(i)), -// recv(r2) -> v => assert_eq!(v, Ok(i)), -// } - -// s3.send(()).unwrap(); -// } -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_send() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// assert_eq!(r1.recv().unwrap(), i); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// select! { -// send(s1, i) -> _ => {}, -// send(s2, i) -> _ => {}, -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_mixed() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded(0); -// let (s2, r2) = bounded(0); -// let (s3, r3) = bounded(100); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// s1.send(i).unwrap(); -// assert_eq!(r2.recv().unwrap(), i); -// r3.recv().unwrap(); -// } -// }); - -// for i in 0..COUNT { -// for _ in 0..2 { -// select! { -// recv(r1) -> v => assert_eq!(v, Ok(i)), -// send(s2, i) -> _ => {}, -// } -// } -// s3.send(()).unwrap(); -// } -// }) -// .unwrap(); -// } - -// #[test] -// fn stress_timeout_two_threads() { -// const COUNT: usize = 20; - -// let (s, r) = bounded(2); - -// scope(|scope| { -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// loop { -// select! { -// send(s, i) -> _ => break, -// default(ms(100)) => {} -// } -// } -// } -// }); - -// scope.spawn(|_| { -// for i in 0..COUNT { -// if i % 2 == 0 { -// thread::sleep(ms(500)); -// } - -// loop { -// select! { -// recv(r) -> v => { -// assert_eq!(v, Ok(i)); -// break; -// } -// default(ms(100)) => {} -// } -// } -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn send_recv_same_channel() { -// let (s, r) = bounded::<i32>(0); -// select! { -// send(s, 0) -> _ => panic!(), -// recv(r) -> _ => panic!(), -// default(ms(500)) => {} -// } - -// let (s, r) = unbounded::<i32>(); -// select! { -// send(s, 0) -> _ => {}, -// recv(r) -> _ => panic!(), -// default(ms(500)) => panic!(), -// } -// } - -// #[test] -// fn matching() { -// const THREADS: usize = 44; - -// let (s, r) = &bounded::<usize>(0); - -// scope(|scope| { -// for i in 0..THREADS { -// scope.spawn(move |_| { -// select! { -// recv(r) -> v => assert_ne!(v.unwrap(), i), -// send(s, i) -> _ => {}, -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn matching_with_leftover() { -// const THREADS: usize = 55; - -// let (s, r) = &bounded::<usize>(0); - -// scope(|scope| { -// for i in 0..THREADS { -// scope.spawn(move |_| { -// select! { -// recv(r) -> v => assert_ne!(v.unwrap(), i), -// send(s, i) -> _ => {}, -// } -// }); -// } -// s.send(!0).unwrap(); -// }) -// .unwrap(); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn channel_through_channel() { -// const COUNT: usize = 1000; - -// type T = Box<dyn Any + Send>; - -// for cap in 0..3 { -// let (s, r) = bounded::<T>(cap); - -// scope(|scope| { -// scope.spawn(move |_| { -// let mut s = s; - -// for _ in 0..COUNT { -// let (new_s, new_r) = bounded(cap); -// let new_r: T = Box::new(Some(new_r)); - -// select! { -// send(s, new_r) -> _ => {} -// } - -// s = new_s; -// } -// }); - -// scope.spawn(move |_| { -// let mut r = r; - -// for _ in 0..COUNT { -// r = select! { -// recv(r) -> msg => { -// msg.unwrap() -// .downcast_mut::<Option<Receiver<T>>>() -// .unwrap() -// .take() -// .unwrap() -// } -// } -// } -// }); -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn linearizable_default() { -// const COUNT: usize = 100_000; - -// for step in 0..2 { -// let (start_s, start_r) = bounded::<()>(0); -// let (end_s, end_r) = bounded::<()>(0); - -// let ((s1, r1), (s2, r2)) = if step == 0 { -// (bounded::<i32>(1), bounded::<i32>(1)) -// } else { -// (unbounded::<i32>(), unbounded::<i32>()) -// }; - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// start_s.send(()).unwrap(); - -// s1.send(1).unwrap(); -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// default => unreachable!() -// } - -// end_s.send(()).unwrap(); -// let _ = r2.try_recv(); -// } -// }); - -// for _ in 0..COUNT { -// start_r.recv().unwrap(); - -// s2.send(1).unwrap(); -// let _ = r1.try_recv(); - -// end_r.recv().unwrap(); -// } -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn linearizable_timeout() { -// const COUNT: usize = 100_000; - -// for step in 0..2 { -// let (start_s, start_r) = bounded::<()>(0); -// let (end_s, end_r) = bounded::<()>(0); - -// let ((s1, r1), (s2, r2)) = if step == 0 { -// (bounded::<i32>(1), bounded::<i32>(1)) -// } else { -// (unbounded::<i32>(), unbounded::<i32>()) -// }; - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..COUNT { -// start_s.send(()).unwrap(); - -// s1.send(1).unwrap(); -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// default(ms(0)) => unreachable!() -// } - -// end_s.send(()).unwrap(); -// let _ = r2.try_recv(); -// } -// }); - -// for _ in 0..COUNT { -// start_r.recv().unwrap(); - -// s2.send(1).unwrap(); -// let _ = r1.try_recv(); - -// end_r.recv().unwrap(); -// } -// }) -// .unwrap(); -// } -// } - -// #[test] -// fn fairness1() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(COUNT); -// let (s2, r2) = unbounded::<()>(); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let mut hits = [0usize; 4]; -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// recv(after(ms(0))) -> _ => hits[2] += 1, -// recv(tick(ms(0))) -> _ => hits[3] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } - -// #[test] -// fn fairness2() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = unbounded::<()>(); -// let (s2, r2) = bounded::<()>(1); -// let (s3, r3) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// let (hole, _r) = bounded(0); - -// for _ in 0..COUNT { -// let s1 = if s1.is_empty() { &s1 } else { &hole }; -// let s2 = if s2.is_empty() { &s2 } else { &hole }; - -// select! { -// send(s1, ()) -> res => assert!(res.is_ok()), -// send(s2, ()) -> res => assert!(res.is_ok()), -// send(s3, ()) -> res => assert!(res.is_ok()), -// } -// } -// }); - -// let hits = vec![Cell::new(0usize); 3]; -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0].set(hits[0].get() + 1), -// recv(r2) -> _ => hits[1].set(hits[1].get() + 1), -// recv(r3) -> _ => hits[2].set(hits[2].get() + 1), -// } -// } -// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); -// }) -// .unwrap(); -// } - -// #[test] -// fn fairness_recv() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(COUNT); -// let (s2, r2) = unbounded::<()>(); - -// for _ in 0..COUNT { -// s1.send(()).unwrap(); -// s2.send(()).unwrap(); -// } - -// let mut hits = [0usize; 2]; -// while hits[0] + hits[1] < COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / 4)); -// } - -// #[test] -// fn fairness_send() { -// const COUNT: usize = 10_000; - -// let (s1, _r1) = bounded::<()>(COUNT); -// let (s2, _r2) = unbounded::<()>(); - -// let mut hits = [0usize; 2]; -// for _ in 0..COUNT { -// select! { -// send(s1, ()) -> _ => hits[0] += 1, -// send(s2, ()) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / 4)); -// } - -// #[test] -// fn references() { -// let (s, r) = unbounded::<i32>(); -// select! { -// send(s, 0) -> _ => {} -// recv(r) -> _ => {} -// } -// select! { -// send(&&&&s, 0) -> _ => {} -// recv(&&&&r) -> _ => {} -// } -// select! { -// recv(Some(&r).unwrap_or(&never())) -> _ => {}, -// default => {} -// } -// select! { -// recv(Some(r).unwrap_or(never())) -> _ => {}, -// default => {} -// } -// } - -// #[test] -// fn case_blocks() { -// let (s, r) = unbounded::<i32>(); - -// select! { -// recv(r) -> _ => 3.0, -// recv(r) -> _ => loop { -// unreachable!() -// }, -// recv(r) -> _ => match 7 + 3 { -// _ => unreachable!() -// }, -// default => 7. -// }; - -// select! { -// recv(r) -> msg => if msg.is_ok() { -// unreachable!() -// }, -// default => () -// } - -// drop(s); -// } - -// #[test] -// fn move_handles() { -// let (s, r) = unbounded::<i32>(); -// select! { -// recv((move || r)()) -> _ => {} -// send((move || s)(), 0) -> _ => {} -// } -// } - -// #[test] -// fn infer_types() { -// let (s, r) = unbounded(); -// select! { -// recv(r) -> _ => {} -// default => {} -// } -// s.send(()).unwrap(); - -// let (s, r) = unbounded(); -// select! { -// send(s, ()) -> _ => {} -// } -// r.recv().unwrap(); -// } - -// #[test] -// fn default_syntax() { -// let (s, r) = bounded::<i32>(0); - -// select! { -// recv(r) -> _ => panic!(), -// default => {} -// } -// select! { -// send(s, 0) -> _ => panic!(), -// default() => {} -// } -// select! { -// default => {} -// } -// select! { -// default() => {} -// } -// } - -// #[test] -// fn same_variable_name() { -// let (_, r) = unbounded::<i32>(); -// select! { -// recv(r) -> r => assert!(r.is_err()), -// } -// } - -// #[test] -// fn handles_on_heap() { -// let (s, r) = unbounded::<i32>(); -// let (s, r) = (Box::new(s), Box::new(r)); - -// select! { -// send(*s, 0) -> _ => {} -// recv(*r) -> _ => {} -// default => {} -// } - -// drop(s); -// drop(r); -// } - -// #[test] -// fn once_blocks() { -// let (s, r) = unbounded::<i32>(); - -// let once = Box::new(()); -// select! { -// send(s, 0) -> _ => drop(once), -// } - -// let once = Box::new(()); -// select! { -// recv(r) -> _ => drop(once), -// } - -// let once1 = Box::new(()); -// let once2 = Box::new(()); -// select! { -// send(s, 0) -> _ => drop(once1), -// default => drop(once2), -// } - -// let once1 = Box::new(()); -// let once2 = Box::new(()); -// select! { -// recv(r) -> _ => drop(once1), -// default => drop(once2), -// } - -// let once1 = Box::new(()); -// let once2 = Box::new(()); -// select! { -// recv(r) -> _ => drop(once1), -// send(s, 0) -> _ => drop(once2), -// } -// } - -// #[test] -// fn once_receiver() { -// let (_, r) = unbounded::<i32>(); - -// let once = Box::new(()); -// let get = move || { -// drop(once); -// r -// }; - -// select! { -// recv(get()) -> _ => {} -// } -// } - -// #[test] -// fn once_sender() { -// let (s, _) = unbounded::<i32>(); - -// let once = Box::new(()); -// let get = move || { -// drop(once); -// s -// }; - -// select! { -// send(get(), 5) -> _ => {} -// } -// } - -// #[test] -// fn parse_nesting() { -// let (_, r) = unbounded::<i32>(); - -// select! { -// recv(r) -> _ => {} -// recv(r) -> _ => { -// select! { -// recv(r) -> _ => {} -// recv(r) -> _ => { -// select! { -// recv(r) -> _ => {} -// recv(r) -> _ => { -// select! { -// default => {} -// } -// } -// } -// } -// } -// } -// } -// } - -// #[test] -// fn evaluate() { -// let (s, r) = unbounded::<i32>(); - -// let v = select! { -// recv(r) -> _ => "foo".into(), -// send(s, 0) -> _ => "bar".to_owned(), -// default => "baz".to_string(), -// }; -// assert_eq!(v, "bar"); - -// let v = select! { -// recv(r) -> _ => "foo".into(), -// default => "baz".to_string(), -// }; -// assert_eq!(v, "foo"); - -// let v = select! { -// recv(r) -> _ => "foo".into(), -// default => "baz".to_string(), -// }; -// assert_eq!(v, "baz"); -// } - -// #[test] -// fn deref() { -// use crossbeam_channel as cc; - -// struct Sender<T>(cc::Sender<T>); -// struct Receiver<T>(cc::Receiver<T>); - -// impl<T> Deref for Receiver<T> { -// type Target = cc::Receiver<T>; - -// fn deref(&self) -> &Self::Target { -// &self.0 -// } -// } - -// impl<T> Deref for Sender<T> { -// type Target = cc::Sender<T>; - -// fn deref(&self) -> &Self::Target { -// &self.0 -// } -// } - -// let (s, r) = bounded::<i32>(0); -// let (s, r) = (Sender(s), Receiver(r)); - -// select! { -// send(s, 0) -> _ => panic!(), -// recv(r) -> _ => panic!(), -// default => {} -// } -// } - -// #[test] -// fn result_types() { -// let (s, _) = bounded::<i32>(0); -// let (_, r) = bounded::<i32>(0); - -// select! { -// recv(r) -> res => drop::<Result<i32, RecvError>>(res), -// } -// select! { -// recv(r) -> res => drop::<Result<i32, RecvError>>(res), -// default => {} -// } -// select! { -// recv(r) -> res => drop::<Result<i32, RecvError>>(res), -// default(ms(0)) => {} -// } - -// select! { -// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), -// } -// select! { -// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), -// default => {} -// } -// select! { -// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), -// default(ms(0)) => {} -// } - -// select! { -// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), -// recv(r) -> res => drop::<Result<i32, RecvError>>(res), -// } -// } - -// #[test] -// fn try_recv() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// recv(r) -> _ => panic!(), -// default => {} -// } -// thread::sleep(ms(1500)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(7)), -// default => panic!(), -// } -// thread::sleep(ms(500)); -// select! { -// recv(r) -> v => assert_eq!(v, Err(RecvError)), -// default => panic!(), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1000)); -// select! { -// send(s, 7) -> res => res.unwrap(), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn recv() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// recv(r) -> v => assert_eq!(v, Ok(7)), -// } -// thread::sleep(ms(1000)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(8)), -// } -// thread::sleep(ms(1000)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(9)), -// } -// select! { -// recv(r) -> v => assert_eq!(v, Err(RecvError)), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1500)); -// select! { -// send(s, 7) -> res => res.unwrap(), -// } -// select! { -// send(s, 8) -> res => res.unwrap(), -// } -// select! { -// send(s, 9) -> res => res.unwrap(), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn recv_timeout() { -// let (s, r) = bounded::<i32>(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// recv(r) -> _ => panic!(), -// default(ms(1000)) => {} -// } -// select! { -// recv(r) -> v => assert_eq!(v, Ok(7)), -// default(ms(1000)) => panic!(), -// } -// select! { -// recv(r) -> v => assert_eq!(v, Err(RecvError)), -// default(ms(1000)) => panic!(), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1500)); -// select! { -// send(s, 7) -> res => res.unwrap(), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn try_send() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// send(s, 7) -> _ => panic!(), -// default => {} -// } -// thread::sleep(ms(1500)); -// select! { -// send(s, 8) -> res => res.unwrap(), -// default => panic!(), -// } -// thread::sleep(ms(500)); -// select! { -// send(s, 8) -> res => assert_eq!(res, Err(SendError(8))), -// default => panic!(), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1000)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(8)), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn send() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// send(s, 7) -> res => res.unwrap(), -// } -// thread::sleep(ms(1000)); -// select! { -// send(s, 8) -> res => res.unwrap(), -// } -// thread::sleep(ms(1000)); -// select! { -// send(s, 9) -> res => res.unwrap(), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1500)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(7)), -// } -// select! { -// recv(r) -> v => assert_eq!(v, Ok(8)), -// } -// select! { -// recv(r) -> v => assert_eq!(v, Ok(9)), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn send_timeout() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// send(s, 7) -> _ => panic!(), -// default(ms(1000)) => {} -// } -// select! { -// send(s, 8) -> res => res.unwrap(), -// default(ms(1000)) => panic!(), -// } -// select! { -// send(s, 9) -> res => assert_eq!(res, Err(SendError(9))), -// default(ms(1000)) => panic!(), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1500)); -// select! { -// recv(r) -> v => assert_eq!(v, Ok(8)), -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn disconnect_wakes_sender() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// send(s, ()) -> res => assert_eq!(res, Err(SendError(()))), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1000)); -// drop(r); -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn disconnect_wakes_receiver() { -// let (s, r) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(move |_| { -// select! { -// recv(r) -> res => assert_eq!(res, Err(RecvError)), -// } -// }); -// scope.spawn(move |_| { -// thread::sleep(ms(1000)); -// drop(s); -// }); -// }) -// .unwrap(); -// } diff --git a/vendor/flume/tests/stream.rs b/vendor/flume/tests/stream.rs deleted file mode 100644 index e3b32cd..0000000 --- a/vendor/flume/tests/stream.rs +++ /dev/null @@ -1,255 +0,0 @@ -#[cfg(feature = "async")] -use { - flume::*, - futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}, - async_std::prelude::FutureExt, - std::time::Duration, -}; -use futures::{stream, Stream}; - -#[cfg(feature = "async")] -#[test] -fn stream_recv() { - let (tx, rx) = unbounded(); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - tx.send(42u32).unwrap(); - println!("sent"); - }); - - async_std::task::block_on(async { - println!("receiving..."); - let x = rx.stream().next().await; - println!("received"); - assert_eq!(x, Some(42)); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_recv_disconnect() { - let (tx, rx) = bounded::<i32>(0); - - let t = std::thread::spawn(move || { - tx.send(42); - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(tx) - }); - - async_std::task::block_on(async { - let mut stream = rx.into_stream(); - assert_eq!(stream.next().await, Some(42)); - assert_eq!(stream.next().await, None); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_recv_drop_recv() { - let (tx, rx) = bounded::<i32>(10); - - let rx2 = rx.clone(); - let mut stream = rx.into_stream(); - - async_std::task::block_on(async { - let res = async_std::future::timeout( - std::time::Duration::from_millis(500), - stream.next() - ).await; - - assert!(res.is_err()); - }); - - let t = std::thread::spawn(move || { - async_std::task::block_on(async { - rx2.stream().next().await - }) - }); - - std::thread::sleep(std::time::Duration::from_millis(500)); - - tx.send(42).unwrap(); - - drop(stream); - - assert_eq!(t.join().unwrap(), Some(42)) -} - -#[cfg(feature = "async")] -#[test] -fn r#stream_drop_send_disconnect() { - let (tx, rx) = bounded::<i32>(1); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(tx); - }); - - async_std::task::block_on(async { - let mut stream = rx.into_stream(); - assert_eq!(stream.next().await, None); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn stream_send_1_million_no_drop_or_reorder() { - #[derive(Debug)] - enum Message { - Increment { - old: u64, - }, - ReturnCount, - } - - let (tx, rx) = unbounded(); - - let t = async_std::task::spawn(async move { - let mut count = 0u64; - let mut stream = rx.into_stream(); - - while let Some(Message::Increment { old }) = stream.next().await { - assert_eq!(old, count); - count += 1; - } - - count - }); - - for next in 0..1_000_000 { - tx.send(Message::Increment { old: next }).unwrap(); - } - - tx.send(Message::ReturnCount).unwrap(); - - let count = t.await; - assert_eq!(count, 1_000_000) -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn parallel_streams_and_async_recv() { - let (tx, rx) = flume::unbounded(); - let rx = ℞ - let send_fut = async move { - let n_sends: usize = 100000; - for _ in 0..n_sends { - tx.send_async(()).await.unwrap(); - } - }; - - async_std::task::spawn( - send_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Send timed out!")) - ); - - let mut futures_unordered = (0..250) - .map(|n| async move { - if n % 2 == 0 { - let mut stream = rx.stream(); - while let Some(()) = stream.next().await {} - } else { - while let Ok(()) = rx.recv_async().await {} - } - - }) - .collect::<FuturesUnordered<_>>(); - - let recv_fut = async { - while futures_unordered.next().await.is_some() {} - }; - - recv_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Receive timed out!")) - .await - .unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_no_double_wake() { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - use std::pin::Pin; - use std::task::Context; - use futures::task::{waker, ArcWake}; - use futures::Stream; - - let count = Arc::new(AtomicUsize::new(0)); - - // all this waker does is count how many times it is called - struct CounterWaker { - count: Arc<AtomicUsize>, - } - - impl ArcWake for CounterWaker { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.count.fetch_add(1, Ordering::SeqCst); - } - } - - // create waker and context - let w = CounterWaker { - count: count.clone(), - }; - let w = waker(Arc::new(w)); - let cx = &mut Context::from_waker(&w); - - // create unbounded channel - let (tx, rx) = unbounded::<()>(); - let mut stream = rx.stream(); - - // register waker with stream - let _ = Pin::new(&mut stream).poll_next(cx); - - // send multiple items - tx.send(()).unwrap(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - - // verify that stream is only woken up once. - assert_eq!(count.load(Ordering::SeqCst), 1); -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55 - fn dummy_stream() -> impl Stream<Item = usize> { - stream::unfold(0, |count| async move { - if count < 1000 { - Some((count, count + 1)) - } else { - None - } - }) - } - - let (send_task, recv_task) = { - use futures::SinkExt; - let (tx, rx) = flume::bounded(100); - - let send_task = dummy_stream() - .map(|i| Ok(i)) - .forward(tx.into_sink().sink_map_err(|e| { - panic!("send error:{:#?}", e) - })); - - let recv_task = rx - .into_stream() - .for_each(|item| async move {}); - (send_task, recv_task) - }; - - let jh = async_std::task::spawn(send_task); - async_std::task::block_on(recv_task); - jh.await.unwrap(); -} diff --git a/vendor/flume/tests/thread_locals.rs b/vendor/flume/tests/thread_locals.rs deleted file mode 100644 index acde751..0000000 --- a/vendor/flume/tests/thread_locals.rs +++ /dev/null @@ -1,53 +0,0 @@ -// //! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics. - -// extern crate crossbeam_utils; - -// use std::thread; -// use std::time::Duration; - -// use flume::unbounded; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// #[cfg_attr(target_os = "macos", ignore = "TLS is destroyed too early on macOS")] -// fn use_while_exiting() { -// struct Foo; - -// impl Drop for Foo { -// fn drop(&mut self) { -// // A blocking operation after the thread-locals have been dropped. This will attempt to -// // use the thread-locals and must not panic. -// let (_s, r) = unbounded::<()>(); -// select! { -// recv(r) -> _ => {} -// default(ms(100)) => {} -// } -// } -// } - -// thread_local! { -// static FOO: Foo = Foo; -// } - -// let (s, r) = unbounded::<()>(); - -// scope(|scope| { -// scope.spawn(|_| { -// // First initialize `FOO`, then the thread-locals related to crossbeam-channel. -// FOO.with(|_| ()); -// r.recv().unwrap(); -// // At thread exit, thread-locals related to crossbeam-channel get dropped first and -// // `FOO` is dropped last. -// }); - -// scope.spawn(|_| { -// thread::sleep(ms(100)); -// s.send(()).unwrap(); -// }); -// }) -// .unwrap(); -// } diff --git a/vendor/flume/tests/tick.rs b/vendor/flume/tests/tick.rs deleted file mode 100644 index b0fcd44..0000000 --- a/vendor/flume/tests/tick.rs +++ /dev/null @@ -1,353 +0,0 @@ -// //! Tests for the tick channel flavor. - -// #[macro_use] -// extern crate crossbeam_channel; -// extern crate crossbeam_utils; -// extern crate rand; - -// use std::sync::atomic::AtomicUsize; -// use std::sync::atomic::Ordering; -// use std::thread; -// use std::time::{Duration, Instant}; - -// use crossbeam_channel::{after, tick, Select, TryRecvError}; -// use crossbeam_utils::thread::scope; - -// fn ms(ms: u64) -> Duration { -// Duration::from_millis(ms) -// } - -// #[test] -// fn fire() { -// let start = Instant::now(); -// let r = tick(ms(50)); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// thread::sleep(ms(100)); - -// let fired = r.try_recv().unwrap(); -// assert!(start < fired); -// assert!(fired - start >= ms(50)); - -// let now = Instant::now(); -// assert!(fired < now); -// assert!(now - fired >= ms(50)); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - -// select! { -// recv(r) -> _ => panic!(), -// default => {} -// } - -// select! { -// recv(r) -> _ => {} -// recv(tick(ms(200))) -> _ => panic!(), -// } -// } - -// #[test] -// fn intervals() { -// let start = Instant::now(); -// let r = tick(ms(50)); - -// let t1 = r.recv().unwrap(); -// assert!(start + ms(50) <= t1); -// assert!(start + ms(100) > t1); - -// thread::sleep(ms(300)); -// let t2 = r.try_recv().unwrap(); -// assert!(start + ms(100) <= t2); -// assert!(start + ms(150) > t2); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// let t3 = r.recv().unwrap(); -// assert!(start + ms(400) <= t3); -// assert!(start + ms(450) > t3); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn capacity() { -// const COUNT: usize = 10; - -// for i in 0..COUNT { -// let r = tick(ms(i as u64)); -// assert_eq!(r.capacity(), Some(1)); -// } -// } - -// #[test] -// fn len_empty_full() { -// let r = tick(ms(50)); - -// assert_eq!(r.len(), 0); -// assert_eq!(r.is_empty(), true); -// assert_eq!(r.is_full(), false); - -// thread::sleep(ms(100)); - -// assert_eq!(r.len(), 1); -// assert_eq!(r.is_empty(), false); -// assert_eq!(r.is_full(), true); - -// r.try_recv().unwrap(); - -// assert_eq!(r.len(), 0); -// assert_eq!(r.is_empty(), true); -// assert_eq!(r.is_full(), false); -// } - -// #[test] -// fn try_recv() { -// let r = tick(ms(200)); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(100)); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(200)); -// assert!(r.try_recv().is_ok()); -// assert!(r.try_recv().is_err()); - -// thread::sleep(ms(200)); -// assert!(r.try_recv().is_ok()); -// assert!(r.try_recv().is_err()); -// } - -// #[test] -// fn recv() { -// let start = Instant::now(); -// let r = tick(ms(50)); - -// let fired = r.recv().unwrap(); -// assert!(start < fired); -// assert!(fired - start >= ms(50)); - -// let now = Instant::now(); -// assert!(fired < now); -// assert!(now - fired < fired - start); - -// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -// } - -// #[test] -// fn recv_timeout() { -// let start = Instant::now(); -// let r = tick(ms(200)); - -// assert!(r.recv_timeout(ms(100)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(100)); -// assert!(now - start <= ms(150)); - -// let fired = r.recv_timeout(ms(200)).unwrap(); -// assert!(fired - start >= ms(200)); -// assert!(fired - start <= ms(250)); - -// assert!(r.recv_timeout(ms(100)).is_err()); -// let now = Instant::now(); -// assert!(now - start >= ms(300)); -// assert!(now - start <= ms(350)); - -// let fired = r.recv_timeout(ms(200)).unwrap(); -// assert!(fired - start >= ms(400)); -// assert!(fired - start <= ms(450)); -// } - -// #[test] -// fn recv_two() { -// let r1 = tick(ms(50)); -// let r2 = tick(ms(50)); - -// scope(|scope| { -// scope.spawn(|_| { -// for _ in 0..10 { -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// } -// } -// }); -// scope.spawn(|_| { -// for _ in 0..10 { -// select! { -// recv(r1) -> _ => {} -// recv(r2) -> _ => {} -// } -// } -// }); -// }) -// .unwrap(); -// } - -// #[test] -// fn recv_race() { -// select! { -// recv(tick(ms(50))) -> _ => {} -// recv(tick(ms(100))) -> _ => panic!(), -// } - -// select! { -// recv(tick(ms(100))) -> _ => panic!(), -// recv(tick(ms(50))) -> _ => {} -// } -// } - -// #[test] -// fn stress_default() { -// const COUNT: usize = 10; - -// for _ in 0..COUNT { -// select! { -// recv(tick(ms(0))) -> _ => {} -// default => panic!(), -// } -// } - -// for _ in 0..COUNT { -// select! { -// recv(tick(ms(100))) -> _ => panic!(), -// default => {} -// } -// } -// } - -// #[test] -// fn select() { -// const THREADS: usize = 4; - -// let hits = AtomicUsize::new(0); -// let r1 = tick(ms(200)); -// let r2 = tick(ms(300)); - -// scope(|scope| { -// for _ in 0..THREADS { -// scope.spawn(|_| { -// let timeout = after(ms(1100)); -// loop { -// let mut sel = Select::new(); -// let oper1 = sel.recv(&r1); -// let oper2 = sel.recv(&r2); -// let oper3 = sel.recv(&timeout); -// let oper = sel.select(); -// match oper.index() { -// i if i == oper1 => { -// oper.recv(&r1).unwrap(); -// hits.fetch_add(1, Ordering::SeqCst); -// } -// i if i == oper2 => { -// oper.recv(&r2).unwrap(); -// hits.fetch_add(1, Ordering::SeqCst); -// } -// i if i == oper3 => { -// oper.recv(&timeout).unwrap(); -// break; -// } -// _ => unreachable!(), -// } -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(hits.load(Ordering::SeqCst), 8); -// } - -// #[test] -// fn ready() { -// const THREADS: usize = 4; - -// let hits = AtomicUsize::new(0); -// let r1 = tick(ms(200)); -// let r2 = tick(ms(300)); - -// scope(|scope| { -// for _ in 0..THREADS { -// scope.spawn(|_| { -// let timeout = after(ms(1100)); -// 'outer: loop { -// let mut sel = Select::new(); -// sel.recv(&r1); -// sel.recv(&r2); -// sel.recv(&timeout); -// loop { -// match sel.ready() { -// 0 => { -// if r1.try_recv().is_ok() { -// hits.fetch_add(1, Ordering::SeqCst); -// break; -// } -// } -// 1 => { -// if r2.try_recv().is_ok() { -// hits.fetch_add(1, Ordering::SeqCst); -// break; -// } -// } -// 2 => { -// if timeout.try_recv().is_ok() { -// break 'outer; -// } -// } -// _ => unreachable!(), -// } -// } -// } -// }); -// } -// }) -// .unwrap(); - -// assert_eq!(hits.load(Ordering::SeqCst), 8); -// } - -// #[test] -// fn fairness() { -// const COUNT: usize = 30; - -// for &dur in &[0, 1] { -// let mut hits = [0usize; 2]; - -// for _ in 0..COUNT { -// let r1 = tick(ms(dur)); -// let r2 = tick(ms(dur)); - -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// } -// } -// } - -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } -// } - -// #[test] -// fn fairness_duplicates() { -// const COUNT: usize = 30; - -// for &dur in &[0, 1] { -// let mut hits = [0usize; 5]; - -// for _ in 0..COUNT { -// let r = tick(ms(dur)); - -// for _ in 0..COUNT { -// select! { -// recv(r) -> _ => hits[0] += 1, -// recv(r) -> _ => hits[1] += 1, -// recv(r) -> _ => hits[2] += 1, -// recv(r) -> _ => hits[3] += 1, -// recv(r) -> _ => hits[4] += 1, -// } -// } -// } - -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// } -// } diff --git a/vendor/flume/tests/zero.rs b/vendor/flume/tests/zero.rs deleted file mode 100644 index be4239d..0000000 --- a/vendor/flume/tests/zero.rs +++ /dev/null @@ -1,557 +0,0 @@ -//! Tests for the zero channel flavor. - -extern crate crossbeam_utils; -extern crate rand; - -use std::any::Any; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::thread; -use std::time::Duration; - -use flume::{bounded, Receiver}; -use flume::{RecvError, RecvTimeoutError, TryRecvError}; -use flume::{SendError, SendTimeoutError, TrySendError}; -use crossbeam_utils::thread::scope; -use rand::{thread_rng, Rng}; - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn smoke() { - let (s, r) = bounded(0); - assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); -} - -#[test] -fn capacity() { - let (s, r) = bounded::<()>(0); - assert_eq!(s.capacity(), Some(0)); - assert_eq!(r.capacity(), Some(0)); -} - -#[test] -fn len_empty_full() { - let (s, r) = bounded(0); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), true); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), true); - - scope(|scope| { - scope.spawn(|_| s.send(0).unwrap()); - scope.spawn(|_| r.recv().unwrap()); - }) - .unwrap(); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), true); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), true); -} - -#[test] -fn try_recv() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); - thread::sleep(ms(1500)); - assert_eq!(r.try_recv(), Ok(7)); - thread::sleep(ms(500)); - assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv(), Ok(7)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(8)); - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(9)); - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - s.send(8).unwrap(); - s.send(9).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn recv_timeout() { - let (s, r) = bounded::<i32>(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); - assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); - assert_eq!( - r.recv_timeout(ms(1000)), - Err(RecvTimeoutError::Disconnected) - ); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - s.send(7).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn try_send() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); - thread::sleep(ms(1500)); - assert_eq!(s.try_send(8), Ok(())); - thread::sleep(ms(500)); - assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9))); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - assert_eq!(r.recv(), Ok(8)); - }); - }) - .unwrap(); -} - -#[test] -fn send() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - s.send(7).unwrap(); - thread::sleep(ms(1000)); - s.send(8).unwrap(); - thread::sleep(ms(1000)); - s.send(9).unwrap(); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - assert_eq!(r.recv(), Ok(7)); - assert_eq!(r.recv(), Ok(8)); - assert_eq!(r.recv(), Ok(9)); - }); - }) - .unwrap(); -} - -#[test] -fn send_timeout() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!( - s.send_timeout(7, ms(1000)), - Err(SendTimeoutError::Timeout(7)) - ); - assert_eq!(s.send_timeout(8, ms(1000)), Ok(())); - assert_eq!( - s.send_timeout(9, ms(1000)), - Err(SendTimeoutError::Disconnected(9)) - ); - }); - scope.spawn(move |_| { - thread::sleep(ms(1500)); - assert_eq!(r.recv(), Ok(8)); - }); - }) - .unwrap(); -} - -#[test] -fn len() { - const COUNT: usize = 25_000; - - let (s, r) = bounded(0); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - assert_eq!(r.recv(), Ok(i)); - assert_eq!(r.len(), 0); - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - s.send(i).unwrap(); - assert_eq!(s.len(), 0); - } - }); - }) - .unwrap(); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); -} - -#[test] -fn disconnect_wakes_sender() { - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - assert_eq!(s.send(()), Err(SendError(()))); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - drop(r); - }); - }) - .unwrap(); -} - -#[test] -fn disconnect_wakes_receiver() { - let (s, r) = bounded::<()>(0); - - scope(|scope| { - scope.spawn(move |_| { - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - thread::sleep(ms(1000)); - drop(s); - }); - }) - .unwrap(); -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - for i in 0..COUNT { - assert_eq!(r.recv(), Ok(i)); - } - assert!(r.recv().is_err()); - }); - scope.spawn(move |_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - }) - .unwrap(); -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let (s, r) = bounded::<usize>(0); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - let n = r.recv().unwrap(); - v[n].fetch_add(1, Ordering::SeqCst); - } - }); - } - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - s.send(i).unwrap(); - } - }); - } - }) - .unwrap(); - - for c in v { - assert_eq!(c.load(Ordering::SeqCst), THREADS); - } -} - -#[test] -fn stress_oneshot() { - const COUNT: usize = 10_000; - - for _ in 0..COUNT { - let (s, r) = bounded(1); - - scope(|scope| { - scope.spawn(|_| r.recv().unwrap()); - scope.spawn(|_| s.send(0).unwrap()); - }) - .unwrap(); - } -} - -#[test] -fn stress_iter() { - const COUNT: usize = 1000; - - let (request_s, request_r) = bounded(0); - let (response_s, response_r) = bounded(0); - - scope(|scope| { - scope.spawn(move |_| { - let mut count = 0; - loop { - for x in response_r.try_iter() { - count += x; - if count == COUNT { - return; - } - } - let _ = request_s.try_send(()); - } - }); - - for _ in request_r.iter() { - if response_s.send(1).is_err() { - break; - } - } - }) - .unwrap(); -} - -#[test] -fn stress_timeout_two_threads() { - const COUNT: usize = 100; - - let (s, r) = bounded(0); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - loop { - if let Ok(()) = s.send_timeout(i, ms(10)) { - break; - } - } - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - if i % 2 == 0 { - thread::sleep(ms(50)); - } - loop { - if let Ok(x) = r.recv_timeout(ms(10)) { - assert_eq!(x, i); - break; - } - } - } - }); - }) - .unwrap(); -} - -#[test] -fn drops() { - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..100 { - let steps = rng.gen_range(0..3_000); - - DROPS.store(0, Ordering::SeqCst); - let (s, r) = bounded::<DropCounter>(0); - - scope(|scope| { - scope.spawn(|_| { - for _ in 0..steps { - r.recv().unwrap(); - } - }); - - scope.spawn(|_| { - for _ in 0..steps { - s.send(DropCounter).unwrap(); - } - }); - }) - .unwrap(); - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(s); - drop(r); - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - } -} - -// #[test] -// fn fairness() { -// const COUNT: usize = 10_000; - -// let (s1, r1) = bounded::<()>(0); -// let (s2, r2) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// let mut hits = [0usize; 2]; -// for _ in 0..COUNT { -// select! { -// recv(r1) -> _ => hits[0] += 1, -// recv(r2) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// }); - -// let mut hits = [0usize; 2]; -// for _ in 0..COUNT { -// select! { -// send(s1, ()) -> _ => hits[0] += 1, -// send(s2, ()) -> _ => hits[1] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// }) -// .unwrap(); -// } - -// #[test] -// fn fairness_duplicates() { -// const COUNT: usize = 10_000; - -// let (s, r) = bounded::<()>(0); - -// scope(|scope| { -// scope.spawn(|_| { -// let mut hits = [0usize; 5]; -// for _ in 0..COUNT { -// select! { -// recv(r) -> _ => hits[0] += 1, -// recv(r) -> _ => hits[1] += 1, -// recv(r) -> _ => hits[2] += 1, -// recv(r) -> _ => hits[3] += 1, -// recv(r) -> _ => hits[4] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// }); - -// let mut hits = [0usize; 5]; -// for _ in 0..COUNT { -// select! { -// send(s, ()) -> _ => hits[0] += 1, -// send(s, ()) -> _ => hits[1] += 1, -// send(s, ()) -> _ => hits[2] += 1, -// send(s, ()) -> _ => hits[3] += 1, -// send(s, ()) -> _ => hits[4] += 1, -// } -// } -// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); -// }) -// .unwrap(); -// } - -// #[test] -// fn recv_in_send() { -// let (s, r) = bounded(0); - -// scope(|scope| { -// scope.spawn(|_| { -// thread::sleep(ms(100)); -// r.recv() -// }); - -// scope.spawn(|_| { -// thread::sleep(ms(500)); -// s.send(()).unwrap(); -// }); - -// select! { -// send(s, r.recv().unwrap()) -> _ => {} -// } -// }) -// .unwrap(); -// } - -#[test] -fn channel_through_channel() { - const COUNT: usize = 1000; - - type T = Box<dyn Any + Send>; - - let (s, r) = bounded::<T>(0); - - scope(|scope| { - scope.spawn(move |_| { - let mut s = s; - - for _ in 0..COUNT { - let (new_s, new_r) = bounded(0); - let new_r: T = Box::new(Some(new_r)); - - s.send(new_r).unwrap(); - s = new_s; - } - }); - - scope.spawn(move |_| { - let mut r = r; - - for _ in 0..COUNT { - r = r - .recv() - .unwrap() - .downcast_mut::<Option<Receiver<T>>>() - .unwrap() - .take() - .unwrap() - } - }); - }) - .unwrap(); -} |