diff options
Diffstat (limited to 'vendor/flume/tests')
-rw-r--r-- | vendor/flume/tests/after.rs | 339 | ||||
-rw-r--r-- | vendor/flume/tests/array.rs | 657 | ||||
-rw-r--r-- | vendor/flume/tests/async.rs | 276 | ||||
-rw-r--r-- | vendor/flume/tests/basic.rs | 428 | ||||
-rw-r--r-- | vendor/flume/tests/check_same_channel.rs | 57 | ||||
-rw-r--r-- | vendor/flume/tests/golang.rs | 1445 | ||||
-rw-r--r-- | vendor/flume/tests/iter.rs | 112 | ||||
-rw-r--r-- | vendor/flume/tests/list.rs | 536 | ||||
-rw-r--r-- | vendor/flume/tests/method_sharing.rs | 39 | ||||
-rw-r--r-- | vendor/flume/tests/mpsc.rs | 2095 | ||||
-rw-r--r-- | vendor/flume/tests/never.rs | 99 | ||||
-rw-r--r-- | vendor/flume/tests/ready.rs | 837 | ||||
-rw-r--r-- | vendor/flume/tests/same_channel.rs | 114 | ||||
-rw-r--r-- | vendor/flume/tests/select.rs | 1304 | ||||
-rw-r--r-- | vendor/flume/tests/select_macro.rs | 1440 | ||||
-rw-r--r-- | vendor/flume/tests/stream.rs | 255 | ||||
-rw-r--r-- | vendor/flume/tests/thread_locals.rs | 53 | ||||
-rw-r--r-- | vendor/flume/tests/tick.rs | 353 | ||||
-rw-r--r-- | vendor/flume/tests/zero.rs | 557 |
19 files changed, 10996 insertions, 0 deletions
diff --git a/vendor/flume/tests/after.rs b/vendor/flume/tests/after.rs new file mode 100644 index 0000000..6d25108 --- /dev/null +++ b/vendor/flume/tests/after.rs @@ -0,0 +1,339 @@ +// //! Tests for the after channel flavor. + +// #[macro_use] +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; +// extern crate rand; + +// use std::sync::atomic::AtomicUsize; +// use std::sync::atomic::Ordering; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, Select, TryRecvError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn fire() { +// let start = Instant::now(); +// let r = after(ms(50)); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// thread::sleep(ms(100)); + +// let fired = r.try_recv().unwrap(); +// assert!(start < fired); +// assert!(fired - start >= ms(50)); + +// let now = Instant::now(); +// assert!(fired < now); +// assert!(now - fired >= ms(50)); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + +// select! { +// recv(r) -> _ => panic!(), +// default => {} +// } + +// select! { +// recv(r) -> _ => panic!(), +// recv(after(ms(200))) -> _ => {} +// } +// } + +// #[test] +// fn capacity() { +// const COUNT: usize = 10; + +// for i in 0..COUNT { +// let r = after(ms(i as u64)); +// assert_eq!(r.capacity(), Some(1)); +// } +// } + +// #[test] +// fn len_empty_full() { +// let r = after(ms(50)); + +// assert_eq!(r.len(), 0); +// assert_eq!(r.is_empty(), true); +// assert_eq!(r.is_full(), false); + +// thread::sleep(ms(100)); + +// assert_eq!(r.len(), 1); +// assert_eq!(r.is_empty(), false); +// assert_eq!(r.is_full(), true); + +// r.try_recv().unwrap(); + +// assert_eq!(r.len(), 0); +// assert_eq!(r.is_empty(), true); +// assert_eq!(r.is_full(), false); +// } + +// #[test] +// fn try_recv() { +// let r = after(ms(200)); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(100)); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(200)); +// assert!(r.try_recv().is_ok()); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(200)); +// assert!(r.try_recv().is_err()); +// } + +// #[test] +// fn recv() { +// let start = Instant::now(); +// let r = after(ms(50)); + +// let fired = r.recv().unwrap(); +// assert!(start < fired); +// assert!(fired - start >= ms(50)); + +// let now = Instant::now(); +// assert!(fired < now); +// assert!(now - fired < fired - start); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn recv_timeout() { +// let start = Instant::now(); +// let r = after(ms(200)); + +// assert!(r.recv_timeout(ms(100)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(100)); +// assert!(now - start <= ms(150)); + +// let fired = r.recv_timeout(ms(200)).unwrap(); +// assert!(fired - start >= ms(200)); +// assert!(fired - start <= ms(250)); + +// assert!(r.recv_timeout(ms(200)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(400)); +// assert!(now - start <= ms(450)); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn recv_two() { +// let r1 = after(ms(50)); +// let r2 = after(ms(50)); + +// scope(|scope| { +// scope.spawn(|_| { +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// } +// }); +// scope.spawn(|_| { +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn recv_race() { +// select! { +// recv(after(ms(50))) -> _ => {} +// recv(after(ms(100))) -> _ => panic!(), +// } + +// select! { +// recv(after(ms(100))) -> _ => panic!(), +// recv(after(ms(50))) -> _ => {} +// } +// } + +// #[test] +// fn stress_default() { +// const COUNT: usize = 10; + +// for _ in 0..COUNT { +// select! { +// recv(after(ms(0))) -> _ => {} +// default => panic!(), +// } +// } + +// for _ in 0..COUNT { +// select! { +// recv(after(ms(100))) -> _ => panic!(), +// default => {} +// } +// } +// } + +// #[test] +// fn select() { +// const THREADS: usize = 4; +// const COUNT: usize = 1000; +// const TIMEOUT_MS: u64 = 100; + +// let v = (0..COUNT) +// .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) +// .collect::<Vec<_>>(); +// let hits = AtomicUsize::new(0); + +// scope(|scope| { +// for _ in 0..THREADS { +// scope.spawn(|_| { +// let v: Vec<&_> = v.iter().collect(); + +// loop { +// let timeout = after(ms(TIMEOUT_MS)); +// let mut sel = Select::new(); +// for r in &v { +// sel.recv(r); +// } +// let oper_timeout = sel.recv(&timeout); + +// let oper = sel.select(); +// match oper.index() { +// i if i == oper_timeout => { +// oper.recv(&timeout).unwrap(); +// break; +// } +// i => { +// oper.recv(&v[i]).unwrap(); +// hits.fetch_add(1, Ordering::SeqCst); +// } +// } +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(hits.load(Ordering::SeqCst), COUNT); +// } + +// #[test] +// fn ready() { +// const THREADS: usize = 4; +// const COUNT: usize = 1000; +// const TIMEOUT_MS: u64 = 100; + +// let v = (0..COUNT) +// .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) +// .collect::<Vec<_>>(); +// let hits = AtomicUsize::new(0); + +// scope(|scope| { +// for _ in 0..THREADS { +// scope.spawn(|_| { +// let v: Vec<&_> = v.iter().collect(); + +// loop { +// let timeout = after(ms(TIMEOUT_MS)); +// let mut sel = Select::new(); +// for r in &v { +// sel.recv(r); +// } +// let oper_timeout = sel.recv(&timeout); + +// loop { +// let i = sel.ready(); +// if i == oper_timeout { +// timeout.try_recv().unwrap(); +// return; +// } else if v[i].try_recv().is_ok() { +// hits.fetch_add(1, Ordering::SeqCst); +// break; +// } +// } +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(hits.load(Ordering::SeqCst), COUNT); +// } + +// #[test] +// fn stress_clone() { +// const RUNS: usize = 1000; +// const THREADS: usize = 10; +// const COUNT: usize = 50; + +// for i in 0..RUNS { +// let r = after(ms(i as u64)); + +// scope(|scope| { +// for _ in 0..THREADS { +// scope.spawn(|_| { +// let r = r.clone(); +// let _ = r.try_recv(); + +// for _ in 0..COUNT { +// drop(r.clone()); +// thread::yield_now(); +// } +// }); +// } +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn fairness() { +// const COUNT: usize = 1000; + +// for &dur in &[0, 1] { +// let mut hits = [0usize; 2]; + +// for _ in 0..COUNT { +// select! { +// recv(after(ms(dur))) -> _ => hits[0] += 1, +// recv(after(ms(dur))) -> _ => hits[1] += 1, +// } +// } + +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } +// } + +// #[test] +// fn fairness_duplicates() { +// const COUNT: usize = 1000; + +// for &dur in &[0, 1] { +// let mut hits = [0usize; 5]; + +// for _ in 0..COUNT { +// let r = after(ms(dur)); +// select! { +// recv(r) -> _ => hits[0] += 1, +// recv(r) -> _ => hits[1] += 1, +// recv(r) -> _ => hits[2] += 1, +// recv(r) -> _ => hits[3] += 1, +// recv(r) -> _ => hits[4] += 1, +// } +// } + +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } +// } diff --git a/vendor/flume/tests/array.rs b/vendor/flume/tests/array.rs new file mode 100644 index 0000000..a72bbe3 --- /dev/null +++ b/vendor/flume/tests/array.rs @@ -0,0 +1,657 @@ +//! Tests for the array channel flavor. + +extern crate crossbeam_utils; +extern crate rand; + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use flume::{bounded, Receiver}; +use flume::{RecvError, RecvTimeoutError, TryRecvError}; +use flume::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = bounded(1); + s.send(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send(8).unwrap(); + assert_eq!(r.recv(), Ok(8)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); +} + +#[test] +fn capacity() { + for i in 1..10 { + let (s, r) = bounded::<()>(i); + assert_eq!(s.capacity(), Some(i)); + assert_eq!(r.capacity(), Some(i)); + } +} + +#[test] +fn len_empty_full() { + let (s, r) = bounded(2); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 2); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), true); + assert_eq!(r.len(), 2); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), true); + + r.recv().unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); +} + +#[test] +fn try_recv() { + let (s, r) = bounded(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = bounded(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = bounded::<i32>(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.try_send(1), Ok(())); + assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); + thread::sleep(ms(1500)); + assert_eq!(s.try_send(3), Ok(())); + thread::sleep(ms(500)); + assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.try_recv(), Ok(1)); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv(), Ok(3)); + }); + }) + .unwrap(); +} + +#[test] +fn send() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| { + s.send(7).unwrap(); + thread::sleep(ms(1000)); + s.send(8).unwrap(); + thread::sleep(ms(1000)); + s.send(9).unwrap(); + thread::sleep(ms(1000)); + s.send(10).unwrap(); + }); + scope.spawn(|_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(7)); + assert_eq!(r.recv(), Ok(8)); + assert_eq!(r.recv(), Ok(9)); + }); + }) + .unwrap(); +} + +#[test] +fn send_timeout() { + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send_timeout(1, ms(1000)), Ok(())); + assert_eq!(s.send_timeout(2, ms(1000)), Ok(())); + assert_eq!( + s.send_timeout(3, ms(500)), + Err(SendTimeoutError::Timeout(3)) + ); + thread::sleep(ms(1000)); + assert_eq!(s.send_timeout(4, ms(1000)), Ok(())); + thread::sleep(ms(1000)); + assert_eq!(s.send(5), Err(SendError(5))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(1)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(4)); + }); + }) + .unwrap(); +} + +#[test] +fn send_after_disconnect() { + let (s, r) = bounded(100); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(r); + + assert_eq!(s.send(4), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); + assert_eq!( + s.send_timeout(6, ms(500)), + Err(SendTimeoutError::Disconnected(6)) + ); +} + +#[test] +fn recv_after_disconnect() { + let (s, r) = bounded(100); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(3)); + assert!(r.recv().is_err()); +} + +#[test] +fn len() { + const COUNT: usize = 25_000; + const CAP: usize = 1000; + + let (s, r) = bounded(CAP); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for _ in 0..CAP / 10 { + for i in 0..50 { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + r.recv().unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..CAP { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for _ in 0..CAP { + r.recv().unwrap(); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + let len = r.len(); + assert!(len <= CAP); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + let len = s.len(); + assert!(len <= CAP); + } + }); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_sender() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send(()), Ok(())); + assert_eq!(s.send(()), Err(SendError(()))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(r); + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = bounded::<()>(1); + + scope(|scope| { + scope.spawn(move |_| { + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let (s, r) = bounded(3); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + const COUNT: usize = 100_000; + + let (request_s, request_r) = bounded(1); + let (response_s, response_r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(()) = s.send_timeout(i, ms(10)) { + break; + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + const RUNS: usize = 100; + + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..RUNS { + let steps = rng.gen_range(0..10_000); + let additional = rng.gen_range(0..50); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = bounded::<DropCounter>(50); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + for _ in 0..additional { + s.send(DropCounter).unwrap(); + } + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); + } +} + +#[test] +fn linearizable() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded(THREADS); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + s.send(0).unwrap(); + r.try_recv().unwrap(); + } + }); + } + }) + .unwrap(); +} + +// #[test] +// fn fairness() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = bounded::<()>(COUNT); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let mut hits = [0usize; 2]; +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness_duplicates() { +// const COUNT: usize = 10_000; + +// let (s, r) = bounded::<()>(COUNT); + +// for _ in 0..COUNT { +// s.send(()).unwrap(); +// } + +// let mut hits = [0usize; 5]; +// for _ in 0..COUNT { +// select! { +// recv(r) -> _ => hits[0] += 1, +// recv(r) -> _ => hits[1] += 1, +// recv(r) -> _ => hits[2] += 1, +// recv(r) -> _ => hits[3] += 1, +// recv(r) -> _ => hits[4] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn recv_in_send() { +// let (s, _r) = bounded(1); +// s.send(()).unwrap(); + +// #[allow(unreachable_code)] +// { +// select! { +// send(s, panic!()) -> _ => panic!(), +// default => {} +// } +// } + +// let (s, r) = bounded(2); +// s.send(()).unwrap(); + +// select! { +// send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} +// } +// } + +#[test] +fn channel_through_channel() { + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = bounded::<T>(1); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(1); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} diff --git a/vendor/flume/tests/async.rs b/vendor/flume/tests/async.rs new file mode 100644 index 0000000..6c2c7f2 --- /dev/null +++ b/vendor/flume/tests/async.rs @@ -0,0 +1,276 @@ +#[cfg(feature = "async")] +use { + flume::*, + futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future}, + futures::task::{Context, Waker, Poll}, + async_std::prelude::FutureExt, + std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}}, +}; + +#[cfg(feature = "async")] +#[test] +fn r#async_recv() { + let (tx, rx) = unbounded(); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + tx.send(42u32).unwrap(); + }); + + async_std::task::block_on(async { + assert_eq!(rx.recv_async().await.unwrap(), 42); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_send() { + let (tx, rx) = bounded(1); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + assert_eq!(rx.recv(), Ok(42)); + }); + + async_std::task::block_on(async { + tx.send_async(42u32).await.unwrap(); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_recv_disconnect() { + let (tx, rx) = bounded::<i32>(0); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(tx) + }); + + async_std::task::block_on(async { + assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected)); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_send_disconnect() { + let (tx, rx) = bounded(0); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(rx) + }); + + async_std::task::block_on(async { + assert_eq!(tx.send_async(42u32).await, Err(SendError(42))); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_recv_drop_recv() { + let (tx, rx) = bounded::<i32>(10); + + let recv_fut = rx.recv_async(); + + async_std::task::block_on(async { + let res = async_std::future::timeout(std::time::Duration::from_millis(500), rx.recv_async()).await; + assert!(res.is_err()); + }); + + let rx2 = rx.clone(); + let t = std::thread::spawn(move || { + async_std::task::block_on(async { + rx2.recv_async().await + }) + }); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + tx.send(42).unwrap(); + + drop(recv_fut); + + assert_eq!(t.join().unwrap(), Ok(42)) +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn r#async_send_1_million_no_drop_or_reorder() { + #[derive(Debug)] + enum Message { + Increment { + old: u64, + }, + ReturnCount, + } + + let (tx, rx) = unbounded(); + + let t = async_std::task::spawn(async move { + let mut count = 0u64; + + while let Ok(Message::Increment { old }) = rx.recv_async().await { + assert_eq!(old, count); + count += 1; + } + + count + }); + + for next in 0..1_000_000 { + tx.send(Message::Increment { old: next }).unwrap(); + } + + tx.send(Message::ReturnCount).unwrap(); + + let count = t.await; + assert_eq!(count, 1_000_000) +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn parallel_async_receivers() { + let (tx, rx) = flume::unbounded(); + let send_fut = async move { + let n_sends: usize = 100000; + for _ in 0..n_sends { + tx.send_async(()).await.unwrap(); + } + }; + + async_std::task::spawn( + send_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Send timed out!")) + ); + + let mut futures_unordered = (0..250) + .map(|_| async { + while let Ok(()) = rx.recv_async().await + /* rx.recv() is OK */ + {} + }) + .collect::<FuturesUnordered<_>>(); + + let recv_fut = async { + while futures_unordered.next().await.is_some() {} + }; + + recv_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Receive timed out!")) + .await + .unwrap(); + + println!("recv end"); +} + +#[cfg(feature = "async")] +#[test] +fn change_waker() { + let (tx, rx) = flume::bounded(1); + tx.send(()).unwrap(); + + struct DebugWaker(Arc<AtomicUsize>, Waker); + + impl DebugWaker { + fn new() -> Self { + let woken = Arc::new(AtomicUsize::new(0)); + let woken_cloned = woken.clone(); + let waker = waker_fn::waker_fn(move || { + woken.fetch_add(1, Ordering::SeqCst); + }); + DebugWaker(woken_cloned, waker) + } + + fn woken(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + fn ctx(&self) -> Context { + Context::from_waker(&self.1) + } + } + + // Check that the waker is correctly updated when sending tasks change their wakers + { + let send_fut = tx.send_async(()); + futures::pin_mut!(send_fut); + + let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); + + // Set the waker to waker1 + assert_eq!(send_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); + + // Change the waker to waker2 + assert_eq!(send_fut.poll(&mut waker2.ctx()), Poll::Pending); + + // Wake the future + rx.recv().unwrap(); + + // Check that waker2 was woken and waker1 was not + assert_eq!(waker1.woken(), 0); + assert_eq!(waker2.woken(), 1); + } + + // Check that the waker is correctly updated when receiving tasks change their wakers + { + rx.recv().unwrap(); + let recv_fut = rx.recv_async(); + futures::pin_mut!(recv_fut); + + let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); + + // Set the waker to waker1 + assert_eq!(recv_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); + + // Change the waker to waker2 + assert_eq!(recv_fut.poll(&mut waker2.ctx()), Poll::Pending); + + // Wake the future + tx.send(()).unwrap(); + + // Check that waker2 was woken and waker1 was not + assert_eq!(waker1.woken(), 0); + assert_eq!(waker2.woken(), 1); + } +} + +#[cfg(feature = "async")] +#[test] +fn spsc_single_threaded_value_ordering() { + async fn test() { + let (tx, rx) = flume::bounded(4); + tokio::select! { + _ = producer(tx) => {}, + _ = consumer(rx) => {}, + } + } + + async fn producer(tx: flume::Sender<usize>) { + for i in 0..100 { + tx.send_async(i).await.unwrap(); + } + } + + async fn consumer(rx: flume::Receiver<usize>) { + let mut expected = 0; + while let Ok(value) = rx.recv_async().await { + assert_eq!(value, expected); + expected += 1; + } + } + + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(test()); +} diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs new file mode 100644 index 0000000..b937436 --- /dev/null +++ b/vendor/flume/tests/basic.rs @@ -0,0 +1,428 @@ +use std::time::{Instant, Duration}; +use flume::*; + +#[test] +fn send_recv() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); } + assert!(rx.try_recv().is_err()); +} + +#[test] +fn iter() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + drop(tx); + assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); +} + +#[test] +fn try_iter() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum()); +} + +#[test] +fn iter_threaded() { + let (tx, rx) = unbounded(); + for i in 0..1000 { + let tx = tx.clone(); + std::thread::spawn(move || tx.send(i).unwrap()); + } + drop(tx); + assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn send_timeout() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = bounded(1); + + assert!(tx.send_timeout(42, dur).is_ok()); + + let then = Instant::now(); + assert!(tx.send_timeout(43, dur).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + assert_eq!(rx.drain().count(), 1); + + drop(rx); + + assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err()); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn recv_timeout() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = unbounded(); + let then = Instant::now(); + assert!(rx.recv_timeout(dur).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + tx.send(42).unwrap(); + assert_eq!(rx.recv_timeout(dur), Ok(42)); + assert!(Instant::now().duration_since(now) < max_error); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn recv_deadline() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = unbounded(); + let then = Instant::now(); + assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + tx.send(42).unwrap(); + assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42)); + assert!(Instant::now().duration_since(now) < max_error); +} + +#[test] +fn recv_timeout_missed_send() { + let (tx, rx) = bounded(10); + + assert!(rx.recv_timeout(Duration::from_millis(100)).is_err()); + + tx.send(42).unwrap(); + + assert_eq!(rx.recv(), Ok(42)); +} + +#[test] +fn disconnect_tx() { + let (tx, rx) = unbounded::<()>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn disconnect_rx() { + let (tx, rx) = unbounded(); + drop(rx); + assert!(tx.send(0).is_err()); +} + +#[test] +fn drain() { + let (tx, rx) = unbounded(); + + for i in 0..100 { + tx.send(i).unwrap(); + } + + assert_eq!(rx.drain().sum::<u32>(), (0..100).sum()); + + for i in 0..100 { + tx.send(i).unwrap(); + } + + for i in 0..100 { + tx.send(i).unwrap(); + } + + rx.recv().unwrap(); + + (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r)); +} + +#[test] +fn try_send() { + let (tx, rx) = bounded(5); + + for i in 0..5 { + tx.try_send(i).unwrap(); + } + + assert!(tx.try_send(42).is_err()); + + assert_eq!(rx.recv(), Ok(0)); + + assert_eq!(tx.try_send(42), Ok(())); + + assert_eq!(rx.recv(), Ok(1)); + drop(rx); + + assert!(tx.try_send(42).is_err()); +} + +#[test] +fn send_bounded() { + let (tx, rx) = bounded(5); + + for _ in 0..5 { + tx.send(42).unwrap(); + } + + let _ = rx.recv().unwrap(); + + tx.send(42).unwrap(); + + assert!(tx.try_send(42).is_err()); + + rx.drain(); + + let mut ts = Vec::new(); + for _ in 0..100 { + let tx = tx.clone(); + ts.push(std::thread::spawn(move || { + for i in 0..10000 { + tx.send(i).unwrap(); + } + })); + } + + drop(tx); + + assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100); + + for t in ts { + t.join().unwrap(); + } + + assert!(rx.recv().is_err()); +} + +#[test] +fn rendezvous() { + let (tx, rx) = bounded(0); + + for i in 0..5 { + let tx = tx.clone(); + let t = std::thread::spawn(move || { + assert!(tx.try_send(()).is_err()); + + let then = Instant::now(); + tx.send(()).unwrap(); + let now = Instant::now(); + + assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i); + }); + + std::thread::sleep(Duration::from_millis(1000)); + rx.recv().unwrap(); + + t.join().unwrap(); + } +} + +#[test] +fn hydra() { + let thread_num = 32; + let msg_num = 1000; + + let (main_tx, main_rx) = unbounded::<()>(); + + let mut txs = Vec::new(); + for _ in 0..thread_num { + let main_tx = main_tx.clone(); + let (tx, rx) = unbounded(); + txs.push(tx); + + std::thread::spawn(move || { + for msg in rx.iter() { + main_tx.send(msg).unwrap(); + } + }); + } + + drop(main_tx); + + for _ in 0..10 { + for tx in &txs { + for _ in 0..msg_num { + tx.send(Default::default()).unwrap(); + } + } + + for _ in 0..thread_num { + for _ in 0..msg_num { + main_rx.recv().unwrap(); + } + } + } + + drop(txs); + assert!(main_rx.recv().is_err()); +} + +#[test] +fn robin() { + let thread_num = 32; + let msg_num = 10; + + let (mut main_tx, main_rx) = bounded::<()>(1); + + for _ in 0..thread_num { + let (mut tx, rx) = bounded(100); + std::mem::swap(&mut tx, &mut main_tx); + + std::thread::spawn(move || { + for msg in rx.iter() { + tx.send(msg).unwrap(); + } + }); + } + + for _ in 0..10 { + let main_tx = main_tx.clone(); + std::thread::spawn(move || { + for _ in 0..msg_num { + main_tx.send(Default::default()).unwrap(); + } + }); + + for _ in 0..msg_num { + main_rx.recv().unwrap(); + } + } +} + +#[cfg(feature = "select")] +#[test] +fn select_general() { + #[derive(Debug, PartialEq)] + struct Foo(usize); + + let (tx0, rx0) = bounded(1); + let (tx1, rx1) = unbounded(); + + for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() { + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + let _ = t.send(Foo(i)); + }); + } + + let x = Selector::new() + .recv(&rx0, |x| x) + .recv(&rx1, |x| x) + .wait() + .unwrap(); + + if x == Foo(0) { + assert!(rx1.recv().unwrap() == Foo(1)); + } else { + assert!(rx0.recv().unwrap() == Foo(0)); + } + + tx0.send(Foo(42)).unwrap(); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(100)); + assert_eq!(rx0.recv().unwrap(), Foo(42)); + assert_eq!(rx0.recv().unwrap(), Foo(43)); + + }); + + Selector::new() + .send(&tx0, Foo(43), |x| x) + .wait() + .unwrap(); + + t.join().unwrap(); +} + +struct MessageWithoutDebug(u32); + +#[test] +// This is a 'does it build' test, to make sure that the error types can turn +// into a std::error::Error without requiring the payload (which is not used +// there) to impl Debug. +fn std_error_without_debug() { + let (tx, rx) = unbounded::<MessageWithoutDebug>(); + + match tx.send(MessageWithoutDebug(1)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.recv() { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match tx.try_send(MessageWithoutDebug(2)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.try_recv() { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.recv_timeout(Duration::from_secs(10000000)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } +} + +#[test] +fn weak_close() { + let (tx, rx) = unbounded::<()>(); + let weak = tx.downgrade(); + drop(tx); + assert!(weak.upgrade().is_none()); + assert!(rx.is_disconnected()); + assert!(rx.try_recv().is_err()); +} + +#[test] +fn weak_upgrade() { + let (tx, rx) = unbounded(); + let weak = tx.downgrade(); + let tx2 = weak.upgrade().unwrap(); + drop(tx); + assert!(!rx.is_disconnected()); + tx2.send(()).unwrap(); + assert!(rx.try_recv().is_ok()); +} diff --git a/vendor/flume/tests/check_same_channel.rs b/vendor/flume/tests/check_same_channel.rs new file mode 100644 index 0000000..edb82c3 --- /dev/null +++ b/vendor/flume/tests/check_same_channel.rs @@ -0,0 +1,57 @@ +#[test] +fn same_sender() { + let (tx1, _rx) = flume::unbounded::<()>(); + let tx2 = tx1.clone(); + + assert!(tx1.same_channel(&tx2)); + + let (tx3, _rx) = flume::unbounded::<()>(); + + assert!(!tx1.same_channel(&tx3)); + assert!(!tx2.same_channel(&tx3)); +} + +#[test] +fn same_receiver() { + let (_tx, rx1) = flume::unbounded::<()>(); + let rx2 = rx1.clone(); + + assert!(rx1.same_channel(&rx2)); + + let (_tx, rx3) = flume::unbounded::<()>(); + + assert!(!rx1.same_channel(&rx3)); + assert!(!rx2.same_channel(&rx3)); +} + +#[cfg(feature = "async")] +#[test] +fn same_send_sink() { + let (tx1, _rx) = flume::unbounded::<()>(); + let tx1 = tx1.into_sink(); + let tx2 = tx1.clone(); + + assert!(tx1.same_channel(&tx2)); + + let (tx3, _rx) = flume::unbounded::<()>(); + let tx3 = tx3.into_sink(); + + assert!(!tx1.same_channel(&tx3)); + assert!(!tx2.same_channel(&tx3)); +} + +#[cfg(feature = "async")] +#[test] +fn same_recv_stream() { + let (_tx, rx1) = flume::unbounded::<()>(); + let rx1 = rx1.into_stream(); + let rx2 = rx1.clone(); + + assert!(rx1.same_channel(&rx2)); + + let (_tx, rx3) = flume::unbounded::<()>(); + let rx3 = rx3.into_stream(); + + assert!(!rx1.same_channel(&rx3)); + assert!(!rx2.same_channel(&rx3)); +} diff --git a/vendor/flume/tests/golang.rs b/vendor/flume/tests/golang.rs new file mode 100644 index 0000000..ca00840 --- /dev/null +++ b/vendor/flume/tests/golang.rs @@ -0,0 +1,1445 @@ +// //! Tests copied from Go and manually rewritten in Rust. +// //! +// //! Source: +// //! - https://github.com/golang/go +// //! +// //! Copyright & License: +// //! - Copyright (c) 2009 The Go Authors +// //! - https://golang.org/AUTHORS +// //! - https://golang.org/LICENSE +// //! - https://golang.org/PATENTS + +// use std::any::Any; +// use std::cell::Cell; +// use std::collections::HashMap; +// use std::sync::{Arc, Condvar, Mutex}; +// use std::thread; +// use std::time::Duration; + +// use flume::{bounded, tick, Receiver, Select, Sender}; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// struct Chan<T> { +// inner: Arc<Mutex<ChanInner<T>>>, +// } + +// struct ChanInner<T> { +// s: Option<Sender<T>>, +// r: Receiver<T>, +// } + +// impl<T> Clone for Chan<T> { +// fn clone(&self) -> Chan<T> { +// Chan { +// inner: self.inner.clone(), +// } +// } +// } + +// impl<T> Chan<T> { +// fn send(&self, msg: T) { +// let s = self +// .inner +// .lock() +// .unwrap() +// .s +// .as_ref() +// .expect("sending into closed channel") +// .clone(); +// let _ = s.send(msg); +// } + +// fn try_recv(&self) -> Option<T> { +// let r = self.inner.lock().unwrap().r.clone(); +// r.try_recv().ok() +// } + +// fn recv(&self) -> Option<T> { +// let r = self.inner.lock().unwrap().r.clone(); +// r.recv().ok() +// } + +// fn close(&self) { +// self.inner +// .lock() +// .unwrap() +// .s +// .take() +// .expect("channel already closed"); +// } + +// fn rx(&self) -> Receiver<T> { +// self.inner.lock().unwrap().r.clone() +// } + +// fn tx(&self) -> Sender<T> { +// match self.inner.lock().unwrap().s.as_ref() { +// None => { +// let (s, r) = bounded(0); +// std::mem::forget(r); +// s +// } +// Some(s) => s.clone(), +// } +// } +// } + +// impl<T> Iterator for Chan<T> { +// type Item = T; + +// fn next(&mut self) -> Option<Self::Item> { +// self.recv() +// } +// } + +// impl<'a, T> IntoIterator for &'a Chan<T> { +// type Item = T; +// type IntoIter = Chan<T>; + +// fn into_iter(self) -> Self::IntoIter { +// self.clone() +// } +// } + +// fn make<T>(cap: usize) -> Chan<T> { +// let (s, r) = bounded(cap); +// Chan { +// inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), +// } +// } + +// #[derive(Clone)] +// struct WaitGroup(Arc<WaitGroupInner>); + +// struct WaitGroupInner { +// cond: Condvar, +// count: Mutex<i32>, +// } + +// impl WaitGroup { +// fn new() -> WaitGroup { +// WaitGroup(Arc::new(WaitGroupInner { +// cond: Condvar::new(), +// count: Mutex::new(0), +// })) +// } + +// fn add(&self, delta: i32) { +// let mut count = self.0.count.lock().unwrap(); +// *count += delta; +// assert!(*count >= 0); +// self.0.cond.notify_all(); +// } + +// fn done(&self) { +// self.add(-1); +// } + +// fn wait(&self) { +// let mut count = self.0.count.lock().unwrap(); +// while *count > 0 { +// count = self.0.cond.wait(count).unwrap(); +// } +// } +// } + +// struct Defer<F: FnOnce()> { +// f: Option<Box<F>>, +// } + +// impl<F: FnOnce()> Drop for Defer<F> { +// fn drop(&mut self) { +// let f = self.f.take().unwrap(); +// let mut f = Some(f); +// let mut f = move || f.take().unwrap()(); +// f(); +// } +// } + +// macro_rules! defer { +// ($body:expr) => { +// let _defer = Defer { +// f: Some(Box::new(|| $body)), +// }; +// }; +// } + +// macro_rules! go { +// (@parse ref $v:ident, $($tail:tt)*) => {{ +// let ref $v = $v; +// go!(@parse $($tail)*) +// }}; +// (@parse move $v:ident, $($tail:tt)*) => {{ +// let $v = $v; +// go!(@parse $($tail)*) +// }}; +// (@parse $v:ident, $($tail:tt)*) => {{ +// let $v = $v.clone(); +// go!(@parse $($tail)*) +// }}; +// (@parse $body:expr) => { +// ::std::thread::spawn(move || { +// let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| { +// $body +// })); +// if res.is_err() { +// eprintln!("goroutine panicked: {:?}", res); +// ::std::process::abort(); +// } +// }) +// }; +// (@parse $($tail:tt)*) => { +// compile_error!("invalid `go!` syntax") +// }; +// ($($tail:tt)*) => {{ +// go!(@parse $($tail)*) +// }}; +// } + +// // https://github.com/golang/go/blob/master/test/chan/doubleselect.go +// mod doubleselect { +// use super::*; + +// const ITERATIONS: i32 = 10_000; + +// fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { +// defer! { c1.close() } +// defer! { c2.close() } +// defer! { c3.close() } +// defer! { c4.close() } + +// for i in 0..n { +// select! { +// send(c1.tx(), i) -> _ => {} +// send(c2.tx(), i) -> _ => {} +// send(c3.tx(), i) -> _ => {} +// send(c4.tx(), i) -> _ => {} +// } +// } +// } + +// fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) { +// for v in inp { +// out.send(v); +// } +// done.send(true); +// } + +// fn recver(inp: Chan<i32>) { +// let mut seen = HashMap::new(); + +// for v in &inp { +// if seen.contains_key(&v) { +// panic!("got duplicate value for {}", v); +// } +// seen.insert(v, true); +// } +// } + +// #[test] +// fn main() { +// let c1 = make::<i32>(0); +// let c2 = make::<i32>(0); +// let c3 = make::<i32>(0); +// let c4 = make::<i32>(0); +// let done = make::<bool>(0); +// let cmux = make::<i32>(0); + +// go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4)); +// go!(cmux, c1, done, mux(cmux, c1, done)); +// go!(cmux, c2, done, mux(cmux, c2, done)); +// go!(cmux, c3, done, mux(cmux, c3, done)); +// go!(cmux, c4, done, mux(cmux, c4, done)); +// go!(done, cmux, { +// done.recv(); +// done.recv(); +// done.recv(); +// done.recv(); +// cmux.close(); +// }); +// recver(cmux); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/fifo.go +// mod fifo { +// use super::*; + +// const N: i32 = 10; + +// #[test] +// fn asynch_fifo() { +// let ch = make::<i32>(N as usize); +// for i in 0..N { +// ch.send(i); +// } +// for i in 0..N { +// if ch.recv() != Some(i) { +// panic!("bad receive"); +// } +// } +// } + +// fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) { +// inp.recv(); +// if ch.recv() != Some(val) { +// panic!(val); +// } +// out.send(1); +// } + +// #[test] +// fn synch_fifo() { +// let ch = make::<i32>(0); +// let mut inp = make::<i32>(0); +// let start = inp.clone(); + +// for i in 0..N { +// let out = make::<i32>(0); +// go!(ch, i, inp, out, chain(ch, i, inp, out)); +// inp = out; +// } + +// start.send(0); +// for i in 0..N { +// ch.send(i); +// } +// inp.recv(); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/goroutines.go +// mod goroutines { +// use super::*; + +// fn f(left: Chan<i32>, right: Chan<i32>) { +// left.send(right.recv().unwrap()); +// } + +// #[test] +// fn main() { +// let n = 100i32; + +// let leftmost = make::<i32>(0); +// let mut right = leftmost.clone(); +// let mut left = leftmost.clone(); + +// for _ in 0..n { +// right = make::<i32>(0); +// go!(left, right, f(left, right)); +// left = right.clone(); +// } + +// go!(right, right.send(1)); +// leftmost.recv().unwrap(); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/nonblock.go +// mod nonblock { +// use super::*; + +// fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) { +// if c.recv().unwrap() != 123 { +// panic!("i32 value"); +// } +// strobe.send(true); +// } + +// fn i32sender(c: Chan<i32>, strobe: Chan<bool>) { +// c.send(234); +// strobe.send(true); +// } + +// fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) { +// if c.recv().unwrap() != 123456 { +// panic!("i64 value"); +// } +// strobe.send(true); +// } + +// fn i64sender(c: Chan<i64>, strobe: Chan<bool>) { +// c.send(234567); +// strobe.send(true); +// } + +// fn breceiver(c: Chan<bool>, strobe: Chan<bool>) { +// if !c.recv().unwrap() { +// panic!("b value"); +// } +// strobe.send(true); +// } + +// fn bsender(c: Chan<bool>, strobe: Chan<bool>) { +// c.send(true); +// strobe.send(true); +// } + +// fn sreceiver(c: Chan<String>, strobe: Chan<bool>) { +// if c.recv().unwrap() != "hello" { +// panic!("x value"); +// } +// strobe.send(true); +// } + +// fn ssender(c: Chan<String>, strobe: Chan<bool>) { +// c.send("hello again".to_string()); +// strobe.send(true); +// } + +// const MAX_TRIES: usize = 10000; // Up to 100ms per test. + +// #[test] +// fn main() { +// let ticker = tick(Duration::new(0, 10_000)); // 10 us +// let sleep = || { +// ticker.recv().unwrap(); +// ticker.recv().unwrap(); +// thread::yield_now(); +// thread::yield_now(); +// thread::yield_now(); +// }; + +// let sync = make::<bool>(0); + +// for buffer in 0..2 { +// let c32 = make::<i32>(buffer); +// let c64 = make::<i64>(buffer); +// let cb = make::<bool>(buffer); +// let cs = make::<String>(buffer); + +// select! { +// recv(c32.rx()) -> _ => panic!("blocked i32sender"), +// default => {} +// } + +// select! { +// recv(c64.rx()) -> _ => panic!("blocked i64sender"), +// default => {} +// } + +// select! { +// recv(cb.rx()) -> _ => panic!("blocked bsender"), +// default => {} +// } + +// select! { +// recv(cs.rx()) -> _ => panic!("blocked ssender"), +// default => {} +// } + +// go!(c32, sync, i32receiver(c32, sync)); +// let mut try = 0; +// loop { +// select! { +// send(c32.tx(), 123) -> _ => break, +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("i32receiver buffer={}", buffer); +// panic!("fail") +// } +// sleep(); +// } +// } +// } +// sync.recv(); +// go!(c32, sync, i32sender(c32, sync)); +// if buffer > 0 { +// sync.recv(); +// } +// let mut try = 0; +// loop { +// select! { +// recv(c32.rx()) -> v => { +// if v != Ok(234) { +// panic!("i32sender value"); +// } +// break; +// } +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("i32sender buffer={}", buffer); +// panic!("fail"); +// } +// sleep(); +// } +// } +// } +// if buffer == 0 { +// sync.recv(); +// } + +// go!(c64, sync, i64receiver(c64, sync)); +// let mut try = 0; +// loop { +// select! { +// send(c64.tx(), 123456) -> _ => break, +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("i64receiver buffer={}", buffer); +// panic!("fail") +// } +// sleep(); +// } +// } +// } +// sync.recv(); +// go!(c64, sync, i64sender(c64, sync)); +// if buffer > 0 { +// sync.recv(); +// } +// let mut try = 0; +// loop { +// select! { +// recv(c64.rx()) -> v => { +// if v != Ok(234567) { +// panic!("i64sender value"); +// } +// break; +// } +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("i64sender buffer={}", buffer); +// panic!("fail"); +// } +// sleep(); +// } +// } +// } +// if buffer == 0 { +// sync.recv(); +// } + +// go!(cb, sync, breceiver(cb, sync)); +// let mut try = 0; +// loop { +// select! { +// send(cb.tx(), true) -> _ => break, +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("breceiver buffer={}", buffer); +// panic!("fail") +// } +// sleep(); +// } +// } +// } +// sync.recv(); +// go!(cb, sync, bsender(cb, sync)); +// if buffer > 0 { +// sync.recv(); +// } +// let mut try = 0; +// loop { +// select! { +// recv(cb.rx()) -> v => { +// if v != Ok(true) { +// panic!("bsender value"); +// } +// break; +// } +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("bsender buffer={}", buffer); +// panic!("fail"); +// } +// sleep(); +// } +// } +// } +// if buffer == 0 { +// sync.recv(); +// } + +// go!(cs, sync, sreceiver(cs, sync)); +// let mut try = 0; +// loop { +// select! { +// send(cs.tx(), "hello".to_string()) -> _ => break, +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("sreceiver buffer={}", buffer); +// panic!("fail") +// } +// sleep(); +// } +// } +// } +// sync.recv(); +// go!(cs, sync, ssender(cs, sync)); +// if buffer > 0 { +// sync.recv(); +// } +// let mut try = 0; +// loop { +// select! { +// recv(cs.rx()) -> v => { +// if v != Ok("hello again".to_string()) { +// panic!("ssender value"); +// } +// break; +// } +// default => { +// try += 1; +// if try > MAX_TRIES { +// println!("ssender buffer={}", buffer); +// panic!("fail"); +// } +// sleep(); +// } +// } +// } +// if buffer == 0 { +// sync.recv(); +// } +// } +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/select.go +// mod select { +// use super::*; + +// #[test] +// fn main() { +// let shift = Cell::new(0); +// let counter = Cell::new(0); + +// let get_value = || { +// counter.set(counter.get() + 1); +// 1 << shift.get() +// }; + +// let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| { +// let mut i = 0; +// let never = make::<u32>(0); +// loop { +// let nil1 = never.tx(); +// let nil2 = never.tx(); +// let v1 = get_value(); +// let v2 = get_value(); +// select! { +// send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => { +// i += 1; +// a = None; +// } +// send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => { +// i += 1; +// b = None; +// } +// default => break, +// } +// shift.set(shift.get() + 1); +// } +// i +// }; + +// let a = make::<u32>(1); +// let b = make::<u32>(1); + +// assert_eq!(send(Some(&a), Some(&b)), 2); + +// let av = a.recv().unwrap(); +// let bv = b.recv().unwrap(); +// assert_eq!(av | bv, 3); + +// assert_eq!(send(Some(&a), None), 1); +// assert_eq!(counter.get(), 10); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/select2.go +// mod select2 { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/test/chan/select3.go +// mod select3 { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/test/chan/select4.go +// mod select4 { +// use super::*; + +// #[test] +// fn main() { +// let c = make::<i32>(1); +// let c1 = make::<i32>(0); +// c.send(42); +// select! { +// recv(c1.rx()) -> _ => panic!("BUG"), +// recv(c.rx()) -> v => assert_eq!(v, Ok(42)), +// } +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/select6.go +// mod select6 { +// use super::*; + +// #[test] +// fn main() { +// let c1 = make::<bool>(0); +// let c2 = make::<bool>(0); +// let c3 = make::<bool>(0); + +// go!(c1, c1.recv()); +// go!(c1, c2, c3, { +// select! { +// recv(c1.rx()) -> _ => panic!("dummy"), +// recv(c2.rx()) -> _ => c3.send(true), +// } +// c1.recv(); +// }); +// go!(c2, c2.send(true)); + +// c3.recv(); +// c1.send(true); +// c1.send(true); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/select7.go +// mod select7 { +// use super::*; + +// fn recv1(c: Chan<i32>) { +// c.recv().unwrap(); +// } + +// fn recv2(c: Chan<i32>) { +// select! { +// recv(c.rx()) -> _ => () +// } +// } + +// fn recv3(c: Chan<i32>) { +// let c2 = make::<i32>(1); +// select! { +// recv(c.rx()) -> _ => (), +// recv(c2.rx()) -> _ => () +// } +// } + +// fn send1(recv: fn(Chan<i32>)) { +// let c = make::<i32>(1); +// go!(c, recv(c)); +// thread::yield_now(); +// c.send(1); +// } + +// fn send2(recv: fn(Chan<i32>)) { +// let c = make::<i32>(1); +// go!(c, recv(c)); +// thread::yield_now(); +// select! { +// send(c.tx(), 1) -> _ => () +// } +// } + +// fn send3(recv: fn(Chan<i32>)) { +// let c = make::<i32>(1); +// go!(c, recv(c)); +// thread::yield_now(); +// let c2 = make::<i32>(1); +// select! { +// send(c.tx(), 1) -> _ => (), +// send(c2.tx(), 1) -> _ => () +// } +// } + +// #[test] +// fn main() { +// send1(recv1); +// send2(recv1); +// send3(recv1); +// send1(recv2); +// send2(recv2); +// send3(recv2); +// send1(recv3); +// send2(recv3); +// send3(recv3); +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/sieve1.go +// mod sieve1 { +// use super::*; + +// fn generate(ch: Chan<i32>) { +// let mut i = 2; +// loop { +// ch.send(i); +// i += 1; +// } +// } + +// fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) { +// for i in in_ch { +// if i % prime != 0 { +// out_ch.send(i); +// } +// } +// } + +// fn sieve(primes: Chan<i32>) { +// let mut ch = make::<i32>(1); +// go!(ch, generate(ch)); +// loop { +// let prime = ch.recv().unwrap(); +// primes.send(prime); + +// let ch1 = make::<i32>(1); +// go!(ch, ch1, prime, filter(ch, ch1, prime)); +// ch = ch1; +// } +// } + +// #[test] +// fn main() { +// let primes = make::<i32>(1); +// go!(primes, sieve(primes)); + +// let a = [ +// 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, +// 89, 97, +// ]; +// for item in a.iter() { +// let x = primes.recv().unwrap(); +// if x != *item { +// println!("{} != {}", x, item); +// panic!("fail"); +// } +// } +// } +// } + +// // https://github.com/golang/go/blob/master/test/chan/zerosize.go +// mod zerosize { +// use super::*; + +// #[test] +// fn zero_size_struct() { +// struct ZeroSize; +// let _ = make::<ZeroSize>(0); +// } + +// #[test] +// fn zero_size_array() { +// let _ = make::<[u8; 0]>(0); +// } +// } + +// // https://github.com/golang/go/blob/master/src/runtime/chan_test.go +// mod chan_test { +// use super::*; + +// #[test] +// fn test_chan() { +// const N: i32 = 200; + +// for cap in 0..N { +// { +// // Ensure that receive from empty chan blocks. +// let c = make::<i32>(cap as usize); + +// let recv1 = Arc::new(Mutex::new(false)); +// go!(c, recv1, { +// c.recv(); +// *recv1.lock().unwrap() = true; +// }); + +// let recv2 = Arc::new(Mutex::new(false)); +// go!(c, recv2, { +// c.recv(); +// *recv2.lock().unwrap() = true; +// }); + +// thread::sleep(ms(1)); + +// if *recv1.lock().unwrap() || *recv2.lock().unwrap() { +// panic!(); +// } + +// // Ensure that non-blocking receive does not block. +// select! { +// recv(c.rx()) -> _ => panic!(), +// default => {} +// } +// select! { +// recv(c.rx()) -> _ => panic!(), +// default => {} +// } + +// c.send(0); +// c.send(0); +// } + +// { +// // Ensure that send to full chan blocks. +// let c = make::<i32>(cap as usize); +// for i in 0..cap { +// c.send(i); +// } + +// let sent = Arc::new(Mutex::new(0)); +// go!(sent, c, { +// c.send(0); +// *sent.lock().unwrap() = 1; +// }); + +// thread::sleep(ms(1)); + +// if *sent.lock().unwrap() != 0 { +// panic!(); +// } + +// // Ensure that non-blocking send does not block. +// select! { +// send(c.tx(), 0) -> _ => panic!(), +// default => {} +// } +// c.recv(); +// } + +// { +// // Ensure that we receive 0 from closed chan. +// let c = make::<i32>(cap as usize); +// for i in 0..cap { +// c.send(i); +// } +// c.close(); + +// for i in 0..cap { +// let v = c.recv(); +// if v != Some(i) { +// panic!(); +// } +// } + +// if c.recv() != None { +// panic!(); +// } +// if c.try_recv() != None { +// panic!(); +// } +// } + +// { +// // Ensure that close unblocks receive. +// let c = make::<i32>(cap as usize); +// let done = make::<bool>(0); + +// go!(c, done, { +// let v = c.try_recv(); +// done.send(v.is_some()); +// }); + +// thread::sleep(ms(1)); +// c.close(); + +// if !done.recv().unwrap() { +// // panic!(); +// } +// } + +// { +// // Send 100 integers, +// // ensure that we receive them non-corrupted in FIFO order. +// let c = make::<i32>(cap as usize); +// go!(c, { +// for i in 0..100 { +// c.send(i); +// } +// }); +// for i in 0..100 { +// if c.recv() != Some(i) { +// panic!(); +// } +// } + +// // Same, but using recv2. +// go!(c, { +// for i in 0..100 { +// c.send(i); +// } +// }); +// for i in 0..100 { +// if c.recv() != Some(i) { +// panic!(); +// } +// } +// } +// } +// } + +// #[test] +// fn test_nonblock_recv_race() { +// const N: usize = 1000; + +// for _ in 0..N { +// let c = make::<i32>(1); +// c.send(1); + +// let t = go!(c, { +// select! { +// recv(c.rx()) -> _ => {} +// default => panic!("chan is not ready"), +// } +// }); + +// c.close(); +// c.recv(); +// t.join().unwrap(); +// } +// } + +// #[test] +// fn test_nonblock_select_race() { +// const N: usize = 1000; + +// let done = make::<bool>(1); +// for _ in 0..N { +// let c1 = make::<i32>(1); +// let c2 = make::<i32>(1); +// c1.send(1); + +// go!(c1, c2, done, { +// select! { +// recv(c1.rx()) -> _ => {} +// recv(c2.rx()) -> _ => {} +// default => { +// done.send(false); +// return; +// } +// } +// done.send(true); +// }); + +// c2.send(1); +// select! { +// recv(c1.rx()) -> _ => {} +// default => {} +// } +// if !done.recv().unwrap() { +// panic!("no chan is ready"); +// } +// } +// } + +// #[test] +// fn test_nonblock_select_race2() { +// const N: usize = 1000; + +// let done = make::<bool>(1); +// for _ in 0..N { +// let c1 = make::<i32>(1); +// let c2 = make::<i32>(0); +// c1.send(1); + +// go!(c1, c2, done, { +// select! { +// recv(c1.rx()) -> _ => {} +// recv(c2.rx()) -> _ => {} +// default => { +// done.send(false); +// return; +// } +// } +// done.send(true); +// }); + +// c2.close(); +// select! { +// recv(c1.rx()) -> _ => {} +// default => {} +// } +// if !done.recv().unwrap() { +// panic!("no chan is ready"); +// } +// } +// } + +// #[test] +// fn test_self_select() { +// // Ensure that send/recv on the same chan in select +// // does not crash nor deadlock. + +// for &cap in &[0, 10] { +// let wg = WaitGroup::new(); +// wg.add(2); +// let c = make::<i32>(cap); + +// for p in 0..2 { +// let p = p; +// go!(wg, p, c, { +// defer! { wg.done() } +// for i in 0..1000 { +// if p == 0 || i % 2 == 0 { +// select! { +// send(c.tx(), p) -> _ => {} +// recv(c.rx()) -> v => { +// if cap == 0 && v.ok() == Some(p) { +// panic!("self receive"); +// } +// } +// } +// } else { +// select! { +// recv(c.rx()) -> v => { +// if cap == 0 && v.ok() == Some(p) { +// panic!("self receive"); +// } +// } +// send(c.tx(), p) -> _ => {} +// } +// } +// } +// }); +// } +// wg.wait(); +// } +// } + +// #[test] +// fn test_select_stress() { +// let c = vec![ +// make::<i32>(0), +// make::<i32>(0), +// make::<i32>(2), +// make::<i32>(3), +// ]; + +// const N: usize = 10000; + +// // There are 4 goroutines that send N values on each of the chans, +// // + 4 goroutines that receive N values on each of the chans, +// // + 1 goroutine that sends N values on each of the chans in a single select, +// // + 1 goroutine that receives N values on each of the chans in a single select. +// // All these sends, receives and selects interact chaotically at runtime, +// // but we are careful that this whole construct does not deadlock. +// let wg = WaitGroup::new(); +// wg.add(10); + +// for k in 0..4 { +// go!(k, c, wg, { +// for _ in 0..N { +// c[k].send(0); +// } +// wg.done(); +// }); +// go!(k, c, wg, { +// for _ in 0..N { +// c[k].recv(); +// } +// wg.done(); +// }); +// } + +// go!(c, wg, { +// let mut n = [0; 4]; +// let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>(); + +// for _ in 0..4 * N { +// let index = { +// let mut sel = Select::new(); +// let mut opers = [!0; 4]; +// for &i in &[3, 2, 0, 1] { +// if let Some(c) = &c1[i] { +// opers[i] = sel.recv(c); +// } +// } + +// let oper = sel.select(); +// let mut index = !0; +// for i in 0..4 { +// if opers[i] == oper.index() { +// index = i; +// let _ = oper.recv(c1[i].as_ref().unwrap()); +// break; +// } +// } +// index +// }; + +// n[index] += 1; +// if n[index] == N { +// c1[index] = None; +// } +// } +// wg.done(); +// }); + +// go!(c, wg, { +// let mut n = [0; 4]; +// let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>(); + +// for _ in 0..4 * N { +// let index = { +// let mut sel = Select::new(); +// let mut opers = [!0; 4]; +// for &i in &[0, 1, 2, 3] { +// if let Some(c) = &c1[i] { +// opers[i] = sel.send(c); +// } +// } + +// let oper = sel.select(); +// let mut index = !0; +// for i in 0..4 { +// if opers[i] == oper.index() { +// index = i; +// let _ = oper.send(c1[i].as_ref().unwrap(), 0); +// break; +// } +// } +// index +// }; + +// n[index] += 1; +// if n[index] == N { +// c1[index] = None; +// } +// } +// wg.done(); +// }); + +// wg.wait(); +// } + +// #[test] +// fn test_select_fairness() { +// const TRIALS: usize = 10000; + +// let c1 = make::<u8>(TRIALS + 1); +// let c2 = make::<u8>(TRIALS + 1); + +// for _ in 0..TRIALS + 1 { +// c1.send(1); +// c2.send(2); +// } + +// let c3 = make::<u8>(0); +// let c4 = make::<u8>(0); +// let out = make::<u8>(0); +// let done = make::<u8>(0); +// let wg = WaitGroup::new(); + +// wg.add(1); +// go!(wg, c1, c2, c3, c4, out, done, { +// defer! { wg.done() }; +// loop { +// let b; +// select! { +// recv(c3.rx()) -> m => b = m.unwrap(), +// recv(c4.rx()) -> m => b = m.unwrap(), +// recv(c1.rx()) -> m => b = m.unwrap(), +// recv(c2.rx()) -> m => b = m.unwrap(), +// } +// select! { +// send(out.tx(), b) -> _ => {} +// recv(done.rx()) -> _ => return, +// } +// } +// }); + +// let (mut cnt1, mut cnt2) = (0, 0); +// for _ in 0..TRIALS { +// match out.recv() { +// Some(1) => cnt1 += 1, +// Some(2) => cnt2 += 1, +// b => panic!("unexpected value {:?} on channel", b), +// } +// } + +// // If the select in the goroutine is fair, +// // cnt1 and cnt2 should be about the same value. +// // With 10,000 trials, the expected margin of error at +// // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)). + +// let r = cnt1 as f64 / TRIALS as f64; +// let e = (r - 0.5).abs(); + +// if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) { +// panic!( +// "unfair select: in {} trials, results were {}, {}", +// TRIALS, cnt1, cnt2, +// ); +// } + +// done.close(); +// wg.wait(); +// } + +// #[test] +// fn test_chan_send_interface() { +// struct Mt; + +// let c = make::<Box<dyn Any>>(1); +// c.send(Box::new(Mt)); + +// select! { +// send(c.tx(), Box::new(Mt)) -> _ => {} +// default => {} +// } + +// select! { +// send(c.tx(), Box::new(Mt)) -> _ => {} +// send(c.tx(), Box::new(Mt)) -> _ => {} +// default => {} +// } +// } + +// #[test] +// fn test_pseudo_random_send() { +// const N: usize = 100; + +// for cap in 0..N { +// let c = make::<i32>(cap); +// let l = Arc::new(Mutex::new(vec![0i32; N])); +// let done = make::<bool>(0); + +// go!(c, done, l, { +// let mut l = l.lock().unwrap(); +// for i in 0..N { +// thread::yield_now(); +// l[i] = c.recv().unwrap(); +// } +// done.send(true); +// }); + +// for _ in 0..N { +// select! { +// send(c.tx(), 1) -> _ => {} +// send(c.tx(), 0) -> _ => {} +// } +// } +// done.recv(); + +// let mut n0 = 0; +// let mut n1 = 0; +// for &i in l.lock().unwrap().iter() { +// n0 += (i + 1) % 2; +// n1 += i; +// } + +// if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 { +// panic!( +// "Want pseudorandom, got {} zeros and {} ones (chan cap {})", +// n0, n1, cap, +// ); +// } +// } +// } + +// #[test] +// fn test_multi_consumer() { +// const NWORK: usize = 23; +// const NITER: usize = 271828; + +// let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31]; + +// let q = make::<i32>(NWORK * 3); +// let r = make::<i32>(NWORK * 3); + +// let wg = WaitGroup::new(); +// for i in 0..NWORK { +// wg.add(1); +// let w = i; +// go!(q, r, wg, pn, { +// for v in &q { +// if pn[w % pn.len()] == v { +// thread::yield_now(); +// } +// r.send(v); +// } +// wg.done(); +// }); +// } + +// let expect = Arc::new(Mutex::new(0)); +// go!(q, r, expect, wg, pn, { +// for i in 0..NITER { +// let v = pn[i % pn.len()]; +// *expect.lock().unwrap() += v; +// q.send(v); +// } +// q.close(); +// wg.wait(); +// r.close(); +// }); + +// let mut n = 0; +// let mut s = 0; +// for v in &r { +// n += 1; +// s += v; +// } + +// if n != NITER || s != *expect.lock().unwrap() { +// panic!(); +// } +// } + +// #[test] +// fn test_select_duplicate_channel() { +// // This test makes sure we can queue a G on +// // the same channel multiple times. +// let c = make::<i32>(0); +// let d = make::<i32>(0); +// let e = make::<i32>(0); + +// go!(c, d, e, { +// select! { +// recv(c.rx()) -> _ => {} +// recv(d.rx()) -> _ => {} +// recv(e.rx()) -> _ => {} +// } +// e.send(9); +// }); +// thread::sleep(ms(1)); + +// go!(c, c.recv()); +// thread::sleep(ms(1)); + +// d.send(7); +// e.recv(); +// c.send(8); +// } +// } + +// // https://github.com/golang/go/blob/master/test/closedchan.go +// mod closedchan { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go +// mod chanbarrier_test { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go +// mod race_chan_test { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/test/ken/chan.go +// mod chan { +// // TODO +// } + +// // https://github.com/golang/go/blob/master/test/ken/chan1.go +// mod chan1 { +// // TODO +// } diff --git a/vendor/flume/tests/iter.rs b/vendor/flume/tests/iter.rs new file mode 100644 index 0000000..4d69adb --- /dev/null +++ b/vendor/flume/tests/iter.rs @@ -0,0 +1,112 @@ +//! Tests for iteration over receivers. + +extern crate crossbeam_utils; + +use flume::unbounded; +use crossbeam_utils::thread::scope; + +#[test] +fn nested_recv_iter() { + let (s, r) = unbounded::<i32>(); + let (total_s, total_r) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut acc = 0; + for x in r.iter() { + acc += x; + } + total_s.send(acc).unwrap(); + }); + + s.send(3).unwrap(); + s.send(1).unwrap(); + s.send(2).unwrap(); + drop(s); + assert_eq!(total_r.recv().unwrap(), 6); + }) + .unwrap(); +} + +#[test] +fn recv_iter_break() { + let (s, r) = unbounded::<i32>(); + let (count_s, count_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + for x in r.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_s.send(count).unwrap(); + }); + + s.send(2).unwrap(); + s.send(2).unwrap(); + s.send(2).unwrap(); + let _ = s.send(2); + drop(s); + assert_eq!(count_r.recv().unwrap(), 4); + }) + .unwrap(); +} + +#[test] +fn recv_try_iter() { + let (request_s, request_r) = unbounded(); + let (response_s, response_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == 6 { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(2).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn recv_into_iter_owned() { + let mut iter = { + let (s, r) = unbounded::<i32>(); + s.send(1).unwrap(); + s.send(2).unwrap(); + r.into_iter() + }; + + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn recv_into_iter_borrowed() { + let (s, r) = unbounded::<i32>(); + s.send(1).unwrap(); + s.send(2).unwrap(); + drop(s); + + let mut iter = (&r).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} diff --git a/vendor/flume/tests/list.rs b/vendor/flume/tests/list.rs new file mode 100644 index 0000000..851af88 --- /dev/null +++ b/vendor/flume/tests/list.rs @@ -0,0 +1,536 @@ +//! Tests for the list channel flavor. + +extern crate crossbeam_utils; +extern crate rand; + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use flume::{unbounded, Receiver}; +use flume::{RecvError, RecvTimeoutError, TryRecvError}; +use flume::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = unbounded(); + s.try_send(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send(8).unwrap(); + assert_eq!(r.recv(), Ok(8)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); +} + +#[test] +fn capacity() { + let (s, r) = unbounded::<()>(); + assert_eq!(s.capacity(), None); + assert_eq!(r.capacity(), None); +} + +#[test] +fn len_empty_full() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + + r.recv().unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); +} + +#[test] +fn try_recv() { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = unbounded(); + for i in 0..1000 { + assert_eq!(s.try_send(i), Ok(())); + } + + drop(r); + assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777))); +} + +#[test] +fn send() { + let (s, r) = unbounded(); + for i in 0..1000 { + assert_eq!(s.send(i), Ok(())); + } + + drop(r); + assert_eq!(s.send(777), Err(SendError(777))); +} + +#[test] +fn send_timeout() { + let (s, r) = unbounded(); + for i in 0..1000 { + assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(())); + } + + drop(r); + assert_eq!( + s.send_timeout(777, ms(0)), + Err(SendTimeoutError::Disconnected(777)) + ); +} + +#[test] +fn send_after_disconnect() { + let (s, r) = unbounded(); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(r); + + assert_eq!(s.send(4), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); + assert_eq!( + s.send_timeout(6, ms(0)), + Err(SendTimeoutError::Disconnected(6)) + ); +} + +#[test] +fn recv_after_disconnect() { + let (s, r) = unbounded(); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(3)); + assert!(r.recv().is_err()); +} + +#[test] +fn len() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..50 { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + r.recv().unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded::<usize>(); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + const COUNT: usize = 100_000; + + let (request_s, request_r) = unbounded(); + let (response_s, response_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + s.send(i).unwrap(); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..100 { + let steps = rng.gen_range(0..10_000); + let additional = rng.gen_range(0..1000); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = unbounded::<DropCounter>(); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + for _ in 0..additional { + s.try_send(DropCounter).unwrap(); + } + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); + } +} + +#[test] +fn linearizable() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + s.send(0).unwrap(); + r.try_recv().unwrap(); + } + }); + } + }) + .unwrap(); +} + +// #[test] +// fn fairness() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let mut hits = [0usize; 2]; +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness_duplicates() { +// const COUNT: usize = 10_000; + +// let (s, r) = unbounded(); + +// for _ in 0..COUNT { +// s.send(()).unwrap(); +// } + +// let mut hits = [0usize; 5]; +// for _ in 0..COUNT { +// select! { +// recv(r) -> _ => hits[0] += 1, +// recv(r) -> _ => hits[1] += 1, +// recv(r) -> _ => hits[2] += 1, +// recv(r) -> _ => hits[3] += 1, +// recv(r) -> _ => hits[4] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn recv_in_send() { +// let (s, r) = unbounded(); +// s.send(()).unwrap(); + +// select! { +// send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} +// } +// } + +#[test] +fn channel_through_channel() { + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = unbounded::<T>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = unbounded(); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} diff --git a/vendor/flume/tests/method_sharing.rs b/vendor/flume/tests/method_sharing.rs new file mode 100644 index 0000000..24173ea --- /dev/null +++ b/vendor/flume/tests/method_sharing.rs @@ -0,0 +1,39 @@ +#[cfg(feature = "async")] +use flume::*; + +#[cfg(feature = "async")] +#[async_std::test] +async fn sender() { + let (sender, receiver) = bounded(1); + + let sender_fut = sender.send_async(()); + assert_eq!(sender.is_disconnected(), sender_fut.is_disconnected()); + assert_eq!(sender.is_empty(), sender_fut.is_empty()); + assert_eq!(sender.is_full(), sender_fut.is_full()); + assert_eq!(sender.len(), sender_fut.len()); + assert_eq!(sender.capacity(), sender_fut.capacity()); + + let sender_sink = sender.sink(); + assert_eq!(sender.is_disconnected(), sender_sink.is_disconnected()); + assert_eq!(sender.is_empty(), sender_sink.is_empty()); + assert_eq!(sender.is_full(), sender_sink.is_full()); + assert_eq!(sender.len(), sender_sink.len()); + assert_eq!(sender.capacity(), sender_sink.capacity()); + + let receiver_fut = receiver.recv_async(); + assert_eq!(receiver.is_disconnected(), receiver_fut.is_disconnected()); + assert_eq!(receiver.is_empty(), receiver_fut.is_empty()); + assert_eq!(receiver.is_full(), receiver_fut.is_full()); + assert_eq!(receiver.len(), receiver_fut.len()); + assert_eq!(receiver.capacity(), receiver_fut.capacity()); + + let receiver_stream = receiver.stream(); + assert_eq!( + receiver.is_disconnected(), + receiver_stream.is_disconnected() + ); + assert_eq!(receiver.is_empty(), receiver_stream.is_empty()); + assert_eq!(receiver.is_full(), receiver_stream.is_full()); + assert_eq!(receiver.len(), receiver_stream.len()); + assert_eq!(receiver.capacity(), receiver_stream.capacity()); +} diff --git a/vendor/flume/tests/mpsc.rs b/vendor/flume/tests/mpsc.rs new file mode 100644 index 0000000..213b9d8 --- /dev/null +++ b/vendor/flume/tests/mpsc.rs @@ -0,0 +1,2095 @@ +//! Tests copied from `std::sync::mpsc`. +//! +//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but +//! modified to work with `crossbeam-channel` instead. +//! +//! Minor tweaks were needed to make the tests compile: +//! +//! - Replace `box` syntax with `Box::new`. +//! - Replace all uses of `Select` with `select!`. +//! - Change the imports. +//! - Join all spawned threads. +//! - Removed assertion from oneshot_multi_thread_send_close_stress tests. +//! +//! Source: +//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc +//! +//! Copyright & License: +//! - Copyright 2013-2014 The Rust Project Developers +//! - Apache License, Version 2.0 or MIT license, at your option +//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT +//! - https://www.rust-lang.org/en-US/legal.html + +#[macro_use] +extern crate crossbeam_channel as cc; + +use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; +use std::sync::mpsc::{SendError, TrySendError}; +use std::thread::JoinHandle; +use std::time::Duration; + +pub struct Sender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> Sender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + inner: self.inner.clone(), + } + } +} + +pub struct SyncSender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> SyncSender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } + + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.inner.try_send(t).map_err(|err| match err { + cc::TrySendError::Full(m) => TrySendError::Full(m), + cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m), + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + inner: self.inner.clone(), + } + } +} + +pub struct Receiver<T> { + pub inner: cc::Receiver<T>, +} + +impl<T> Receiver<T> { + pub fn try_recv(&self) -> Result<T, TryRecvError> { + self.inner.try_recv().map_err(|err| match err { + cc::TryRecvError::Empty => TryRecvError::Empty, + cc::TryRecvError::Disconnected => TryRecvError::Disconnected, + }) + } + + pub fn recv(&self) -> Result<T, RecvError> { + self.inner.recv().map_err(|_| RecvError) + } + + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + self.inner.recv_timeout(timeout).map_err(|err| match err { + cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout, + cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, + }) + } + + pub fn iter(&self) -> Iter<T> { + Iter { inner: self } + } + + pub fn try_iter(&self) -> TryIter<T> { + TryIter { inner: self } + } +} + +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> IntoIter<T> { + IntoIter { inner: self } + } +} + +pub struct TryIter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.try_recv().ok() + } +} + +pub struct Iter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub struct IntoIter<T> { + inner: Receiver<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (s, r) = cc::unbounded(); + let s = Sender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (s, r) = cc::bounded(bound); + let s = SyncSender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +macro_rules! select { + ( + $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ + ) => ({ + crossbeam_channel_internal! { + $( + recv(($rx).inner) -> res => { + let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); + $code + } + )+ + } + }) +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::{Duration, Instant}; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().unwrap(); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + t.join().ok().unwrap(); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().unwrap(); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + let t = thread::spawn(move || { + drop(tx); + }); + t.join().unwrap(); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<Box<isize>>(); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel(); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t2) = recv(rx, 0) { + ts.push(t2); + } + + fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + let t = thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + t.join().unwrap() + } + + #[test] + fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); + } + + #[test] + fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + let mut ts = Vec::with_capacity(stress); + for i in 0..stress { + let tx = tx.clone(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + ts.push(t); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); + } + + #[test] + fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod sync_channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::Duration; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = sync_channel::<Box<isize>>(1); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn recv_timeout() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, 10000); + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + + drop(tx); + + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<i32>(0); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<i32>(0); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<i32>>(0); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<i32>>(1); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); + } + + #[test] + fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<i32>(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t) = recv(rx, 0) { + ts.push(t); + } + + fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<i32>(0); + let (total_tx, total_rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<i32>(0); + let (count_tx, count_rx) = sync_channel(0); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = sync_channel::<i32>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn send1() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); + t.join().unwrap(); + } + + #[test] + fn send2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.send(1), Ok(())); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send4() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let t2 = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); + t.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn try_send1() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send2() { + let (tx, _rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); + } + + #[test] + fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs +mod select_tests { + use super::*; + + use std::thread; + + #[test] + fn smoke() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + tx1.send(1).unwrap(); + select! { + foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); }, + _bar = rx2.recv() => { panic!() } + } + tx2.send(2).unwrap(); + select! { + _foo = rx1.recv() => { panic!() }, + bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) } + } + drop(tx1); + select! { + foo = rx1.recv() => { assert!(foo.is_err()); }, + _bar = rx2.recv() => { panic!() } + } + drop(tx2); + select! { + bar = rx2.recv() => { assert!(bar.is_err()); } + } + } + + #[test] + fn smoke2() { + let (_tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (_tx3, rx3) = channel::<i32>(); + let (_tx4, rx4) = channel::<i32>(); + let (tx5, rx5) = channel::<i32>(); + tx5.send(4).unwrap(); + select! { + _foo = rx1.recv() => { panic!("1") }, + _foo = rx2.recv() => { panic!("2") }, + _foo = rx3.recv() => { panic!("3") }, + _foo = rx4.recv() => { panic!("4") }, + foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); } + } + } + + #[test] + fn closed() { + let (_tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + drop(tx2); + + select! { + _a1 = rx1.recv() => { panic!() }, + a2 = rx2.recv() => { assert!(a2.is_err()); } + } + } + + #[test] + fn unblocks() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + rx3.recv().unwrap(); + for _ in 0..20 { + thread::yield_now(); + } + }); + + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + _b = rx2.recv() => { panic!() } + } + tx3.send(1).unwrap(); + select! { + a = rx1.recv() => { assert!(a.is_err()) }, + _b = rx2.recv() => { panic!() } + } + t.join().unwrap(); + } + + #[test] + fn both_ready() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn stress() { + const AMT: i32 = 10000; + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for i in 0..AMT { + if i % 2 == 0 { + tx1.send(i).unwrap(); + } else { + tx2.send(i).unwrap(); + } + rx3.recv().unwrap(); + } + }); + + for i in 0..AMT { + select! { + i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, + i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } + } + tx3.send(()).unwrap(); + } + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning2() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn cloning3() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + select! { + _ = rx1.recv() => panic!(), + _ = rx2.recv() => {} + } + tx3.send(()).unwrap(); + }); + + for _ in 0..1000 { + thread::yield_now(); + } + drop(tx1.clone()); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn preflight1() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight2() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight3() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight4() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight5() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight6() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight7() { + let (tx, rx) = channel::<()>(); + drop(tx); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight8() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight9() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn oneshot_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn stream_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + tx1.send(()).unwrap(); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn shared_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + drop(tx1.clone()); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn sync1() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + } + + #[test] + fn sync2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..100 { + thread::yield_now() + } + tx.send(1).unwrap(); + }); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + t.join().unwrap(); + } + + #[test] + fn sync3() { + let (tx1, rx1) = sync_channel::<i32>(0); + let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel(); + let t = thread::spawn(move || { + tx1.send(1).unwrap(); + }); + let t2 = thread::spawn(move || { + tx2.send(2).unwrap(); + }); + select! { + n = rx1.recv() => { + let n = n.unwrap(); + assert_eq!(n, 1); + assert_eq!(rx2.recv().unwrap(), 2); + }, + n = rx2.recv() => { + let n = n.unwrap(); + assert_eq!(n, 2); + assert_eq!(rx1.recv().unwrap(), 1); + } + } + t.join().unwrap(); + t2.join().unwrap(); + } +} diff --git a/vendor/flume/tests/never.rs b/vendor/flume/tests/never.rs new file mode 100644 index 0000000..676903f --- /dev/null +++ b/vendor/flume/tests/never.rs @@ -0,0 +1,99 @@ +// //! Tests for the never channel flavor. + +// #[macro_use] +// extern crate crossbeam_channel; +// extern crate rand; + +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{never, tick, unbounded}; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn smoke() { +// select! { +// recv(never::<i32>()) -> _ => panic!(), +// default => {} +// } +// } + +// #[test] +// fn optional() { +// let (s, r) = unbounded::<i32>(); +// s.send(1).unwrap(); +// s.send(2).unwrap(); + +// let mut r = Some(&r); +// select! { +// recv(r.unwrap_or(&never())) -> _ => {} +// default => panic!(), +// } + +// r = None; +// select! { +// recv(r.unwrap_or(&never())) -> _ => panic!(), +// default => {} +// } +// } + +// #[test] +// fn tick_n() { +// let mut r = tick(ms(100)); +// let mut step = 0; + +// loop { +// select! { +// recv(r) -> _ => step += 1, +// default(ms(500)) => break, +// } + +// if step == 10 { +// r = never(); +// } +// } + +// assert_eq!(step, 10); +// } + +// #[test] +// fn capacity() { +// let r = never::<i32>(); +// assert_eq!(r.capacity(), Some(0)); +// } + +// #[test] +// fn len_empty_full() { +// let r = never::<i32>(); +// assert_eq!(r.len(), 0); +// assert_eq!(r.is_empty(), true); +// assert_eq!(r.is_full(), true); +// } + +// #[test] +// fn try_recv() { +// let r = never::<i32>(); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(100)); +// assert!(r.try_recv().is_err()); +// } + +// #[test] +// fn recv_timeout() { +// let start = Instant::now(); +// let r = never::<i32>(); + +// assert!(r.recv_timeout(ms(100)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(100)); +// assert!(now - start <= ms(150)); + +// assert!(r.recv_timeout(ms(100)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(200)); +// assert!(now - start <= ms(250)); +// } diff --git a/vendor/flume/tests/ready.rs b/vendor/flume/tests/ready.rs new file mode 100644 index 0000000..2f42d9a --- /dev/null +++ b/vendor/flume/tests/ready.rs @@ -0,0 +1,837 @@ +// //! Tests for channel readiness using the `Select` struct. + +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; + +// use std::any::Any; +// use std::cell::Cell; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, bounded, tick, unbounded}; +// use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn smoke1() { +// let (s1, r1) = unbounded::<usize>(); +// let (s2, r2) = unbounded::<usize>(); + +// s1.send(1).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert_eq!(sel.ready(), 0); +// assert_eq!(r1.try_recv(), Ok(1)); + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert_eq!(sel.ready(), 1); +// assert_eq!(r2.try_recv(), Ok(2)); +// } + +// #[test] +// fn smoke2() { +// let (_s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (_s3, r3) = unbounded::<i32>(); +// let (_s4, r4) = unbounded::<i32>(); +// let (s5, r5) = unbounded::<i32>(); + +// s5.send(5).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&r3); +// sel.recv(&r4); +// sel.recv(&r5); +// assert_eq!(sel.ready(), 4); +// assert_eq!(r5.try_recv(), Ok(5)); +// } + +// #[test] +// fn disconnected() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// drop(s1); +// thread::sleep(ms(500)); +// s2.send(5).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// r2.recv().unwrap(); +// }) +// .unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// drop(s2); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert!(sel.try_ready().is_err()); + +// drop(s1); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.try_ready() { +// Ok(0) => assert!(r1.try_recv().is_err()), +// _ => panic!(), +// } + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r2); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } + +// let mut sel = Select::new(); +// sel.recv(&r2); +// assert!(sel.try_ready().is_err()); + +// let mut sel = Select::new(); +// assert!(sel.try_ready().is_err()); +// } + +// #[test] +// fn timeout() { +// let (_s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(1500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// assert!(sel.ready_timeout(ms(1000)).is_err()); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// let (s, r) = unbounded::<i32>(); + +// scope.spawn(move |_| { +// thread::sleep(ms(500)); +// drop(s); +// }); + +// let mut sel = Select::new(); +// assert!(sel.ready_timeout(ms(1000)).is_err()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default_when_disconnected() { +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.try_ready() { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.try_ready() { +// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), +// _ => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready_timeout(ms(1000)) { +// Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), +// _ => panic!(), +// } +// } + +// #[test] +// fn default_only() { +// let start = Instant::now(); + +// let mut sel = Select::new(); +// assert!(sel.try_ready().is_err()); +// let now = Instant::now(); +// assert!(now - start <= ms(50)); + +// let start = Instant::now(); +// let mut sel = Select::new(); +// assert!(sel.ready_timeout(ms(500)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(450)); +// assert!(now - start <= ms(550)); +// } + +// #[test] +// fn unblocks() { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready_timeout(ms(1000)) { +// Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), +// _ => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// assert_eq!(r1.recv().unwrap(), 1); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s1); +// let oper2 = sel.send(&s2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => oper.send(&s1, 1).unwrap(), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn both_ready() { +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s1.send(1).unwrap(); +// assert_eq!(r2.recv().unwrap(), 2); +// }); + +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(1)), +// 1 => s2.try_send(2).unwrap(), +// _ => panic!(), +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning1() { +// scope(|scope| { +// let (s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (s3, r3) = unbounded::<()>(); + +// scope.spawn(move |_| { +// r3.recv().unwrap(); +// drop(s1.clone()); +// assert!(r3.try_recv().is_err()); +// s1.send(1).unwrap(); +// r3.recv().unwrap(); +// }); + +// s3.send(()).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => drop(r1.try_recv()), +// 1 => drop(r2.try_recv()), +// _ => panic!(), +// } + +// s3.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning2() { +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = unbounded::<()>(); +// let (_s3, _r3) = unbounded::<()>(); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => panic!(), +// 1 => drop(r2.try_recv()), +// _ => panic!(), +// } +// }); + +// thread::sleep(ms(500)); +// drop(s1.clone()); +// s2.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn preflight1() { +// let (s, r) = unbounded(); +// s.send(()).unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => drop(r.try_recv()), +// _ => panic!(), +// } +// } + +// #[test] +// fn preflight2() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => assert_eq!(r.try_recv(), Ok(())), +// _ => panic!(), +// } + +// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +// } + +// #[test] +// fn preflight3() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); +// r.recv().unwrap(); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), +// _ => panic!(), +// } +// } + +// #[test] +// fn duplicate_operations() { +// let (s, r) = unbounded::<i32>(); +// let hit = vec![Cell::new(false); 4]; + +// while hit.iter().map(|h| h.get()).any(|hit| !hit) { +// let mut sel = Select::new(); +// sel.recv(&r); +// sel.recv(&r); +// sel.send(&s); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(r.try_recv().is_ok()); +// hit[0].set(true); +// } +// 1 => { +// assert!(r.try_recv().is_ok()); +// hit[1].set(true); +// } +// 2 => { +// assert!(s.try_send(0).is_ok()); +// hit[2].set(true); +// } +// 3 => { +// assert!(s.try_send(0).is_ok()); +// hit[3].set(true); +// } +// _ => panic!(), +// } +// } +// } + +// #[test] +// fn nesting() { +// let (s, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(s.try_send(0).is_ok()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => { +// assert_eq!(r.try_recv(), Ok(0)); + +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => { +// assert!(s.try_send(1).is_ok()); + +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => { +// assert_eq!(r.try_recv(), Ok(1)); +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } +// _ => panic!(), +// } +// } + +// #[test] +// fn stress_recv() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded(); +// let (s2, r2) = bounded(5); +// let (s3, r3) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// r3.recv().unwrap(); + +// s2.send(i).unwrap(); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(i)), +// 1 => assert_eq!(r2.try_recv(), Ok(i)), +// _ => panic!(), +// } + +// s3.send(()).unwrap(); +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_send() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// assert_eq!(r1.recv().unwrap(), i); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.send(&s1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert!(s1.try_send(i).is_ok()), +// 1 => assert!(s2.try_send(i).is_ok()), +// _ => panic!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_mixed() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.send(&s2); +// match sel.ready() { +// 0 => assert_eq!(r1.try_recv(), Ok(i)), +// 1 => assert!(s2.try_send(i).is_ok()), +// _ => panic!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_timeout_two_threads() { +// const COUNT: usize = 20; + +// let (s, r) = bounded(2); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let done = false; +// while !done { +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready_timeout(ms(100)) { +// Err(_) => {} +// Ok(0) => { +// assert!(s.try_send(i).is_ok()); +// break; +// } +// Ok(_) => panic!(), +// } +// } +// } +// }); + +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let mut done = false; +// while !done { +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready_timeout(ms(100)) { +// Err(_) => {} +// Ok(0) => { +// assert_eq!(r.try_recv(), Ok(i)); +// done = true; +// } +// Ok(_) => panic!(), +// } +// } +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send_recv_same_channel() { +// let (s, r) = bounded::<i32>(0); +// let mut sel = Select::new(); +// sel.send(&s); +// sel.recv(&r); +// assert!(sel.ready_timeout(ms(100)).is_err()); + +// let (s, r) = unbounded::<i32>(); +// let mut sel = Select::new(); +// sel.send(&s); +// sel.recv(&r); +// match sel.ready_timeout(ms(100)) { +// Err(_) => panic!(), +// Ok(0) => assert!(s.try_send(0).is_ok()), +// Ok(_) => panic!(), +// } +// } + +// #[test] +// fn channel_through_channel() { +// const COUNT: usize = 1000; + +// type T = Box<dyn Any + Send>; + +// for cap in 1..4 { +// let (s, r) = bounded::<T>(cap); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut s = s; + +// for _ in 0..COUNT { +// let (new_s, new_r) = bounded(cap); +// let new_r: T = Box::new(Some(new_r)); + +// { +// let mut sel = Select::new(); +// sel.send(&s); +// match sel.ready() { +// 0 => assert!(s.try_send(new_r).is_ok()), +// _ => panic!(), +// } +// } + +// s = new_s; +// } +// }); + +// scope.spawn(move |_| { +// let mut r = r; + +// for _ in 0..COUNT { +// let new = { +// let mut sel = Select::new(); +// sel.recv(&r); +// match sel.ready() { +// 0 => r +// .try_recv() +// .unwrap() +// .downcast_mut::<Option<Receiver<T>>>() +// .unwrap() +// .take() +// .unwrap(), +// _ => panic!(), +// } +// }; +// r = new; +// } +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn fairness1() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let hits = vec![Cell::new(0usize); 4]; +// for _ in 0..COUNT { +// let after = after(ms(0)); +// let tick = tick(ms(0)); + +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&after); +// sel.recv(&tick); +// match sel.ready() { +// 0 => { +// r1.try_recv().unwrap(); +// hits[0].set(hits[0].get() + 1); +// } +// 1 => { +// r2.try_recv().unwrap(); +// hits[1].set(hits[1].get() + 1); +// } +// 2 => { +// after.try_recv().unwrap(); +// hits[2].set(hits[2].get() + 1); +// } +// 3 => { +// tick.try_recv().unwrap(); +// hits[3].set(hits[3].get() + 1); +// } +// _ => panic!(), +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness2() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = bounded::<()>(1); +// let (s3, r3) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// let mut oper1 = None; +// let mut oper2 = None; +// if s1.is_empty() { +// oper1 = Some(sel.send(&s1)); +// } +// if s2.is_empty() { +// oper2 = Some(sel.send(&s2)); +// } +// let oper3 = sel.send(&s3); +// let oper = sel.select(); +// match oper.index() { +// i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), +// i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), +// i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), +// _ => unreachable!(), +// } +// } +// }); + +// let hits = vec![Cell::new(0usize); 3]; +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&r3); +// loop { +// match sel.ready() { +// 0 => { +// if r1.try_recv().is_ok() { +// hits[0].set(hits[0].get() + 1); +// break; +// } +// } +// 1 => { +// if r2.try_recv().is_ok() { +// hits[1].set(hits[1].get() + 1); +// break; +// } +// } +// 2 => { +// if r3.try_recv().is_ok() { +// hits[2].set(hits[2].get() + 1); +// break; +// } +// } +// _ => unreachable!(), +// } +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 10)); +// }) +// .unwrap(); +// } diff --git a/vendor/flume/tests/same_channel.rs b/vendor/flume/tests/same_channel.rs new file mode 100644 index 0000000..c6452ec --- /dev/null +++ b/vendor/flume/tests/same_channel.rs @@ -0,0 +1,114 @@ +// extern crate crossbeam_channel; + +// use std::time::Duration; + +// use crossbeam_channel::{after, bounded, never, tick, unbounded}; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn after_same_channel() { +// let r = after(ms(50)); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// let r3 = after(ms(50)); +// assert!(!r.same_channel(&r3)); +// assert!(!r2.same_channel(&r3)); + +// let r4 = after(ms(100)); +// assert!(!r.same_channel(&r4)); +// assert!(!r2.same_channel(&r4)); +// } + +// #[test] +// fn array_same_channel() { +// let (s, r) = bounded::<usize>(1); + +// let s2 = s.clone(); +// assert!(s.same_channel(&s2)); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// let (s3, r3) = bounded::<usize>(1); +// assert!(!s.same_channel(&s3)); +// assert!(!s2.same_channel(&s3)); +// assert!(!r.same_channel(&r3)); +// assert!(!r2.same_channel(&r3)); +// } + +// #[test] +// fn list_same_channel() { +// let (s, r) = unbounded::<usize>(); + +// let s2 = s.clone(); +// assert!(s.same_channel(&s2)); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// let (s3, r3) = unbounded::<usize>(); +// assert!(!s.same_channel(&s3)); +// assert!(!s2.same_channel(&s3)); +// assert!(!r.same_channel(&r3)); +// assert!(!r2.same_channel(&r3)); +// } + +// #[test] +// fn never_same_channel() { +// let r = never::<usize>(); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// // Never channel are always equal to one another. +// let r3 = never::<usize>(); +// assert!(r.same_channel(&r3)); +// assert!(r2.same_channel(&r3)); +// } + +// #[test] +// fn tick_same_channel() { +// let r = tick(ms(50)); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// let r3 = tick(ms(50)); +// assert!(!r.same_channel(&r3)); +// assert!(!r2.same_channel(&r3)); + +// let r4 = tick(ms(100)); +// assert!(!r.same_channel(&r4)); +// assert!(!r2.same_channel(&r4)); +// } + +// #[test] +// fn zero_same_channel() { +// let (s, r) = bounded::<usize>(0); + +// let s2 = s.clone(); +// assert!(s.same_channel(&s2)); + +// let r2 = r.clone(); +// assert!(r.same_channel(&r2)); + +// let (s3, r3) = bounded::<usize>(0); +// assert!(!s.same_channel(&s3)); +// assert!(!s2.same_channel(&s3)); +// assert!(!r.same_channel(&r3)); +// assert!(!r2.same_channel(&r3)); +// } + +// #[test] +// fn different_flavors_same_channel() { +// let (s1, r1) = bounded::<usize>(0); +// let (s2, r2) = unbounded::<usize>(); + +// assert!(!s1.same_channel(&s2)); +// assert!(!r1.same_channel(&r2)); +// } diff --git a/vendor/flume/tests/select.rs b/vendor/flume/tests/select.rs new file mode 100644 index 0000000..4fac9e9 --- /dev/null +++ b/vendor/flume/tests/select.rs @@ -0,0 +1,1304 @@ +// //! Tests for channel selection using the `Select` struct. + +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; + +// use std::any::Any; +// use std::cell::Cell; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn smoke1() { +// let (s1, r1) = unbounded::<usize>(); +// let (s2, r2) = unbounded::<usize>(); + +// s1.send(1).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// } + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), +// _ => unreachable!(), +// } +// } + +// #[test] +// fn smoke2() { +// let (_s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (_s3, r3) = unbounded::<i32>(); +// let (_s4, r4) = unbounded::<i32>(); +// let (s5, r5) = unbounded::<i32>(); + +// s5.send(5).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper3 = sel.recv(&r3); +// let oper4 = sel.recv(&r4); +// let oper5 = sel.recv(&r5); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => panic!(), +// i if i == oper3 => panic!(), +// i if i == oper4 => panic!(), +// i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)), +// _ => unreachable!(), +// } +// } + +// #[test] +// fn disconnected() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// drop(s1); +// thread::sleep(ms(500)); +// s2.send(5).unwrap(); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r1).is_err()), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } + +// r2.recv().unwrap(); +// }) +// .unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r1).is_err()), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// drop(s2); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r2).is_err()), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// let _oper1 = sel.recv(&r1); +// let _oper2 = sel.recv(&r2); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(_) => panic!(), +// } + +// drop(s1); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.try_select(); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r1).is_err()), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } + +// s2.send(2).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r2); +// let oper = sel.try_select(); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)), +// _ => unreachable!(), +// }, +// } + +// let mut sel = Select::new(); +// let _oper1 = sel.recv(&r2); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(_) => panic!(), +// } + +// let mut sel = Select::new(); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(_) => panic!(), +// } +// } + +// #[test] +// fn timeout() { +// let (_s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(1500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); + +// scope(|scope| { +// let (s, r) = unbounded::<i32>(); + +// scope.spawn(move |_| { +// thread::sleep(ms(500)); +// drop(s); +// }); + +// let mut sel = Select::new(); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.try_select(); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r).is_err()), +// _ => unreachable!(), +// }, +// } +// } +// Ok(_) => unreachable!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default_when_disconnected() { +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.try_select(); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r).is_err()), +// _ => unreachable!(), +// }, +// } + +// let (_, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r).is_err()), +// _ => unreachable!(), +// }, +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.try_select(); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.send(&s, 0).is_err()), +// _ => unreachable!(), +// }, +// } + +// let (s, _) = bounded::<i32>(0); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert!(oper.send(&s, 0).is_err()), +// _ => unreachable!(), +// }, +// } +// } + +// #[test] +// fn default_only() { +// let start = Instant::now(); + +// let mut sel = Select::new(); +// let oper = sel.try_select(); +// assert!(oper.is_err()); +// let now = Instant::now(); +// assert!(now - start <= ms(50)); + +// let start = Instant::now(); +// let mut sel = Select::new(); +// let oper = sel.select_timeout(ms(500)); +// assert!(oper.is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(450)); +// assert!(now - start <= ms(550)); +// } + +// #[test] +// fn unblocks() { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s2.send(2).unwrap(); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// assert_eq!(r1.recv().unwrap(), 1); +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s1); +// let oper2 = sel.send(&s2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// i if i == oper1 => oper.send(&s1, 1).unwrap(), +// i if i == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn both_ready() { +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s1.send(1).unwrap(); +// assert_eq!(r2.recv().unwrap(), 2); +// }); + +// for _ in 0..2 { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.send(&s2); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), +// i if i == oper2 => oper.send(&s2, 2).unwrap(), +// _ => unreachable!(), +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn loop_try() { +// const RUNS: usize = 20; + +// for _ in 0..RUNS { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); +// let (s_end, r_end) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| loop { +// let mut done = false; + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s1); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// i if i == oper1 => { +// let _ = oper.send(&s1, 1); +// done = true; +// } +// _ => unreachable!(), +// }, +// } +// if done { +// break; +// } + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r_end); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// i if i == oper1 => { +// let _ = oper.recv(&r_end); +// done = true; +// } +// _ => unreachable!(), +// }, +// } +// if done { +// break; +// } +// }); + +// scope.spawn(|_| loop { +// if let Ok(x) = r2.try_recv() { +// assert_eq!(x, 2); +// break; +// } + +// let mut done = false; +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r_end); +// let oper = sel.try_select(); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// i if i == oper1 => { +// let _ = oper.recv(&r_end); +// done = true; +// } +// _ => unreachable!(), +// }, +// } +// if done { +// break; +// } +// }); + +// scope.spawn(|_| { +// thread::sleep(ms(500)); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.send(&s2); +// let oper = sel.select_timeout(ms(1000)); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), +// i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()), +// _ => unreachable!(), +// }, +// } + +// drop(s_end); +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn cloning1() { +// scope(|scope| { +// let (s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (s3, r3) = unbounded::<()>(); + +// scope.spawn(move |_| { +// r3.recv().unwrap(); +// drop(s1.clone()); +// assert!(r3.try_recv().is_err()); +// s1.send(1).unwrap(); +// r3.recv().unwrap(); +// }); + +// s3.send(()).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => drop(oper.recv(&r1)), +// i if i == oper2 => drop(oper.recv(&r2)), +// _ => unreachable!(), +// } + +// s3.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning2() { +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = unbounded::<()>(); +// let (_s3, _r3) = unbounded::<()>(); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => panic!(), +// i if i == oper2 => drop(oper.recv(&r2)), +// _ => unreachable!(), +// } +// }); + +// thread::sleep(ms(500)); +// drop(s1.clone()); +// s2.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn preflight1() { +// let (s, r) = unbounded(); +// s.send(()).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => drop(oper.recv(&r)), +// _ => unreachable!(), +// } +// } + +// #[test] +// fn preflight2() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())), +// _ => unreachable!(), +// } + +// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +// } + +// #[test] +// fn preflight3() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); +// r.recv().unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => assert!(oper.recv(&r).is_err()), +// _ => unreachable!(), +// } +// } + +// #[test] +// fn duplicate_operations() { +// let (s, r) = unbounded::<i32>(); +// let hit = vec![Cell::new(false); 4]; + +// while hit.iter().map(|h| h.get()).any(|hit| !hit) { +// let mut sel = Select::new(); +// let oper0 = sel.recv(&r); +// let oper1 = sel.recv(&r); +// let oper2 = sel.send(&s); +// let oper3 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper0 => { +// assert!(oper.recv(&r).is_ok()); +// hit[0].set(true); +// } +// i if i == oper1 => { +// assert!(oper.recv(&r).is_ok()); +// hit[1].set(true); +// } +// i if i == oper2 => { +// assert!(oper.send(&s, 0).is_ok()); +// hit[2].set(true); +// } +// i if i == oper3 => { +// assert!(oper.send(&s, 0).is_ok()); +// hit[3].set(true); +// } +// _ => unreachable!(), +// } +// } +// } + +// #[test] +// fn nesting() { +// let (s, r) = unbounded::<i32>(); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// assert!(oper.send(&s, 0).is_ok()); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// assert_eq!(oper.recv(&r), Ok(0)); + +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// assert!(oper.send(&s, 1).is_ok()); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// assert_eq!(oper.recv(&r), Ok(1)); +// } +// _ => unreachable!(), +// } +// } +// _ => unreachable!(), +// } +// } +// _ => unreachable!(), +// } +// } +// _ => unreachable!(), +// } +// } + +// #[test] +// fn stress_recv() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded(); +// let (s2, r2) = bounded(5); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// r3.recv().unwrap(); + +// s2.send(i).unwrap(); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), +// ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)), +// _ => unreachable!(), +// } + +// s3.send(()).unwrap(); +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_send() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// assert_eq!(r1.recv().unwrap(), i); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// let oper1 = sel.send(&s1); +// let oper2 = sel.send(&s2); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()), +// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), +// _ => unreachable!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_mixed() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.send(&s2); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), +// _ => unreachable!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_timeout_two_threads() { +// const COUNT: usize = 20; + +// let (s, r) = bounded(2); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let done = false; +// while !done { +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.select_timeout(ms(100)); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => { +// assert!(oper.send(&s, i).is_ok()); +// break; +// } +// _ => unreachable!(), +// }, +// } +// } +// } +// }); + +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// let mut done = false; +// while !done { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select_timeout(ms(100)); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => { +// assert_eq!(oper.recv(&r), Ok(i)); +// done = true; +// } +// _ => unreachable!(), +// }, +// } +// } +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send_recv_same_channel() { +// let (s, r) = bounded::<i32>(0); +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper2 = sel.recv(&r); +// let oper = sel.select_timeout(ms(100)); +// match oper { +// Err(_) => {} +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => panic!(), +// ix if ix == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } + +// let (s, r) = unbounded::<i32>(); +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper2 = sel.recv(&r); +// let oper = sel.select_timeout(ms(100)); +// match oper { +// Err(_) => panic!(), +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()), +// ix if ix == oper2 => panic!(), +// _ => unreachable!(), +// }, +// } +// } + +// #[test] +// fn matching() { +// const THREADS: usize = 44; + +// let (s, r) = &bounded::<usize>(0); + +// scope(|scope| { +// for i in 0..THREADS { +// scope.spawn(move |_| { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper2 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), +// _ => unreachable!(), +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn matching_with_leftover() { +// const THREADS: usize = 55; + +// let (s, r) = &bounded::<usize>(0); + +// scope(|scope| { +// for i in 0..THREADS { +// scope.spawn(move |_| { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper2 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), +// _ => unreachable!(), +// } +// }); +// } +// s.send(!0).unwrap(); +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn channel_through_channel() { +// const COUNT: usize = 1000; + +// type T = Box<dyn Any + Send>; + +// for cap in 0..3 { +// let (s, r) = bounded::<T>(cap); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut s = s; + +// for _ in 0..COUNT { +// let (new_s, new_r) = bounded(cap); +// let new_r: T = Box::new(Some(new_r)); + +// { +// let mut sel = Select::new(); +// let oper1 = sel.send(&s); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()), +// _ => unreachable!(), +// } +// } + +// s = new_s; +// } +// }); + +// scope.spawn(move |_| { +// let mut r = r; + +// for _ in 0..COUNT { +// let new = { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => oper +// .recv(&r) +// .unwrap() +// .downcast_mut::<Option<Receiver<T>>>() +// .unwrap() +// .take() +// .unwrap(), +// _ => unreachable!(), +// } +// }; +// r = new; +// } +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn linearizable_try() { +// const COUNT: usize = 100_000; + +// for step in 0..2 { +// let (start_s, start_r) = bounded::<()>(0); +// let (end_s, end_r) = bounded::<()>(0); + +// let ((s1, r1), (s2, r2)) = if step == 0 { +// (bounded::<i32>(1), bounded::<i32>(1)) +// } else { +// (unbounded::<i32>(), unbounded::<i32>()) +// }; + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// start_s.send(()).unwrap(); + +// s1.send(1).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.try_select(); +// match oper { +// Err(_) => unreachable!(), +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), +// ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), +// _ => unreachable!(), +// }, +// } + +// end_s.send(()).unwrap(); +// let _ = r2.try_recv(); +// } +// }); + +// for _ in 0..COUNT { +// start_r.recv().unwrap(); + +// s2.send(1).unwrap(); +// let _ = r1.try_recv(); + +// end_r.recv().unwrap(); +// } +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn linearizable_timeout() { +// const COUNT: usize = 100_000; + +// for step in 0..2 { +// let (start_s, start_r) = bounded::<()>(0); +// let (end_s, end_r) = bounded::<()>(0); + +// let ((s1, r1), (s2, r2)) = if step == 0 { +// (bounded::<i32>(1), bounded::<i32>(1)) +// } else { +// (unbounded::<i32>(), unbounded::<i32>()) +// }; + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// start_s.send(()).unwrap(); + +// s1.send(1).unwrap(); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper = sel.select_timeout(ms(0)); +// match oper { +// Err(_) => unreachable!(), +// Ok(oper) => match oper.index() { +// ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), +// ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), +// _ => unreachable!(), +// }, +// } + +// end_s.send(()).unwrap(); +// let _ = r2.try_recv(); +// } +// }); + +// for _ in 0..COUNT { +// start_r.recv().unwrap(); + +// s2.send(1).unwrap(); +// let _ = r1.try_recv(); + +// end_r.recv().unwrap(); +// } +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn fairness1() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let hits = vec![Cell::new(0usize); 4]; +// for _ in 0..COUNT { +// let after = after(ms(0)); +// let tick = tick(ms(0)); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper3 = sel.recv(&after); +// let oper4 = sel.recv(&tick); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// oper.recv(&r1).unwrap(); +// hits[0].set(hits[0].get() + 1); +// } +// i if i == oper2 => { +// oper.recv(&r2).unwrap(); +// hits[1].set(hits[1].get() + 1); +// } +// i if i == oper3 => { +// oper.recv(&after).unwrap(); +// hits[2].set(hits[2].get() + 1); +// } +// i if i == oper4 => { +// oper.recv(&tick).unwrap(); +// hits[3].set(hits[3].get() + 1); +// } +// _ => unreachable!(), +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness2() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = bounded::<()>(1); +// let (s3, r3) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// let mut oper1 = None; +// let mut oper2 = None; +// if s1.is_empty() { +// oper1 = Some(sel.send(&s1)); +// } +// if s2.is_empty() { +// oper2 = Some(sel.send(&s2)); +// } +// let oper3 = sel.send(&s3); +// let oper = sel.select(); +// match oper.index() { +// i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), +// i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), +// i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), +// _ => unreachable!(), +// } +// } +// }); + +// let hits = vec![Cell::new(0usize); 3]; +// for _ in 0..COUNT { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper3 = sel.recv(&r3); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// oper.recv(&r1).unwrap(); +// hits[0].set(hits[0].get() + 1); +// } +// i if i == oper2 => { +// oper.recv(&r2).unwrap(); +// hits[1].set(hits[1].get() + 1); +// } +// i if i == oper3 => { +// oper.recv(&r3).unwrap(); +// hits[2].set(hits[2].get() + 1); +// } +// _ => unreachable!(), +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); +// }) +// .unwrap(); +// } + +// #[test] +// fn sync_and_clone() { +// const THREADS: usize = 20; + +// let (s, r) = &bounded::<usize>(0); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper2 = sel.send(&s); +// let sel = &sel; + +// scope(|scope| { +// for i in 0..THREADS { +// scope.spawn(move |_| { +// let mut sel = sel.clone(); +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), +// _ => unreachable!(), +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn send_and_clone() { +// const THREADS: usize = 20; + +// let (s, r) = &bounded::<usize>(0); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r); +// let oper2 = sel.send(&s); + +// scope(|scope| { +// for i in 0..THREADS { +// let mut sel = sel.clone(); +// scope.spawn(move |_| { +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), +// _ => unreachable!(), +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn reuse() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.send(&s2); + +// for i in 0..COUNT { +// for _ in 0..2 { +// let oper = sel.select(); +// match oper.index() { +// ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), +// ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), +// _ => unreachable!(), +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } diff --git a/vendor/flume/tests/select_macro.rs b/vendor/flume/tests/select_macro.rs new file mode 100644 index 0000000..367d8dd --- /dev/null +++ b/vendor/flume/tests/select_macro.rs @@ -0,0 +1,1440 @@ +// //! Tests for the `select!` macro. + +// #![deny(unsafe_code)] + +// #[macro_use] +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; + +// use std::any::Any; +// use std::cell::Cell; +// use std::ops::Deref; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, bounded, never, tick, unbounded}; +// use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn smoke1() { +// let (s1, r1) = unbounded::<usize>(); +// let (s2, r2) = unbounded::<usize>(); + +// s1.send(1).unwrap(); + +// select! { +// recv(r1) -> v => assert_eq!(v, Ok(1)), +// recv(r2) -> _ => panic!(), +// } + +// s2.send(2).unwrap(); + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> v => assert_eq!(v, Ok(2)), +// } +// } + +// #[test] +// fn smoke2() { +// let (_s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (_s3, r3) = unbounded::<i32>(); +// let (_s4, r4) = unbounded::<i32>(); +// let (s5, r5) = unbounded::<i32>(); + +// s5.send(5).unwrap(); + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> _ => panic!(), +// recv(r3) -> _ => panic!(), +// recv(r4) -> _ => panic!(), +// recv(r5) -> v => assert_eq!(v, Ok(5)), +// } +// } + +// #[test] +// fn disconnected() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// drop(s1); +// thread::sleep(ms(500)); +// s2.send(5).unwrap(); +// }); + +// select! { +// recv(r1) -> v => assert!(v.is_err()), +// recv(r2) -> _ => panic!(), +// default(ms(1000)) => panic!(), +// } + +// r2.recv().unwrap(); +// }) +// .unwrap(); + +// select! { +// recv(r1) -> v => assert!(v.is_err()), +// recv(r2) -> _ => panic!(), +// default(ms(1000)) => panic!(), +// } + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// drop(s2); +// }); + +// select! { +// recv(r2) -> v => assert!(v.is_err()), +// default(ms(1000)) => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default() { +// let (s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> _ => panic!(), +// default => {} +// } + +// drop(s1); + +// select! { +// recv(r1) -> v => assert!(v.is_err()), +// recv(r2) -> _ => panic!(), +// default => panic!(), +// } + +// s2.send(2).unwrap(); + +// select! { +// recv(r2) -> v => assert_eq!(v, Ok(2)), +// default => panic!(), +// } + +// select! { +// recv(r2) -> _ => panic!(), +// default => {}, +// } + +// select! { +// default => {}, +// } +// } + +// #[test] +// fn timeout() { +// let (_s1, r1) = unbounded::<i32>(); +// let (s2, r2) = unbounded::<i32>(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(1500)); +// s2.send(2).unwrap(); +// }); + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> _ => panic!(), +// default(ms(1000)) => {}, +// } + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> v => assert_eq!(v, Ok(2)), +// default(ms(1000)) => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// let (s, r) = unbounded::<i32>(); + +// scope.spawn(move |_| { +// thread::sleep(ms(500)); +// drop(s); +// }); + +// select! { +// default(ms(1000)) => { +// select! { +// recv(r) -> v => assert!(v.is_err()), +// default => panic!(), +// } +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn default_when_disconnected() { +// let (_, r) = unbounded::<i32>(); + +// select! { +// recv(r) -> res => assert!(res.is_err()), +// default => panic!(), +// } + +// let (_, r) = unbounded::<i32>(); + +// select! { +// recv(r) -> res => assert!(res.is_err()), +// default(ms(1000)) => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// select! { +// send(s, 0) -> res => assert!(res.is_err()), +// default => panic!(), +// } + +// let (s, _) = bounded::<i32>(0); + +// select! { +// send(s, 0) -> res => assert!(res.is_err()), +// default(ms(1000)) => panic!(), +// } +// } + +// #[test] +// fn default_only() { +// let start = Instant::now(); +// select! { +// default => {} +// } +// let now = Instant::now(); +// assert!(now - start <= ms(50)); + +// let start = Instant::now(); +// select! { +// default(ms(500)) => {} +// } +// let now = Instant::now(); +// assert!(now - start >= ms(450)); +// assert!(now - start <= ms(550)); +// } + +// #[test] +// fn unblocks() { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s2.send(2).unwrap(); +// }); + +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> v => assert_eq!(v, Ok(2)), +// default(ms(1000)) => panic!(), +// } +// }) +// .unwrap(); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// assert_eq!(r1.recv().unwrap(), 1); +// }); + +// select! { +// send(s1, 1) -> _ => {}, +// send(s2, 2) -> _ => panic!(), +// default(ms(1000)) => panic!(), +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn both_ready() { +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s1.send(1).unwrap(); +// assert_eq!(r2.recv().unwrap(), 2); +// }); + +// for _ in 0..2 { +// select! { +// recv(r1) -> v => assert_eq!(v, Ok(1)), +// send(s2, 2) -> _ => {}, +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn loop_try() { +// const RUNS: usize = 20; + +// for _ in 0..RUNS { +// let (s1, r1) = bounded::<i32>(0); +// let (s2, r2) = bounded::<i32>(0); +// let (s_end, r_end) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| loop { +// select! { +// send(s1, 1) -> _ => break, +// default => {} +// } + +// select! { +// recv(r_end) -> _ => break, +// default => {} +// } +// }); + +// scope.spawn(|_| loop { +// if let Ok(x) = r2.try_recv() { +// assert_eq!(x, 2); +// break; +// } + +// select! { +// recv(r_end) -> _ => break, +// default => {} +// } +// }); + +// scope.spawn(|_| { +// thread::sleep(ms(500)); + +// select! { +// recv(r1) -> v => assert_eq!(v, Ok(1)), +// send(s2, 2) -> _ => {}, +// default(ms(500)) => panic!(), +// } + +// drop(s_end); +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn cloning1() { +// scope(|scope| { +// let (s1, r1) = unbounded::<i32>(); +// let (_s2, r2) = unbounded::<i32>(); +// let (s3, r3) = unbounded::<()>(); + +// scope.spawn(move |_| { +// r3.recv().unwrap(); +// drop(s1.clone()); +// assert_eq!(r3.try_recv(), Err(TryRecvError::Empty)); +// s1.send(1).unwrap(); +// r3.recv().unwrap(); +// }); + +// s3.send(()).unwrap(); + +// select! { +// recv(r1) -> _ => {}, +// recv(r2) -> _ => {}, +// } + +// s3.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn cloning2() { +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = unbounded::<()>(); +// let (_s3, _r3) = unbounded::<()>(); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// recv(r1) -> _ => panic!(), +// recv(r2) -> _ => {}, +// } +// }); + +// thread::sleep(ms(500)); +// drop(s1.clone()); +// s2.send(()).unwrap(); +// }) +// .unwrap(); +// } + +// #[test] +// fn preflight1() { +// let (s, r) = unbounded(); +// s.send(()).unwrap(); + +// select! { +// recv(r) -> _ => {} +// } +// } + +// #[test] +// fn preflight2() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); + +// select! { +// recv(r) -> v => assert!(v.is_ok()), +// } +// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +// } + +// #[test] +// fn preflight3() { +// let (s, r) = unbounded(); +// drop(s.clone()); +// s.send(()).unwrap(); +// drop(s); +// r.recv().unwrap(); + +// select! { +// recv(r) -> v => assert!(v.is_err()) +// } +// } + +// #[test] +// fn duplicate_operations() { +// let (s, r) = unbounded::<i32>(); +// let mut hit = [false; 4]; + +// while hit.iter().any(|hit| !hit) { +// select! { +// recv(r) -> _ => hit[0] = true, +// recv(r) -> _ => hit[1] = true, +// send(s, 0) -> _ => hit[2] = true, +// send(s, 0) -> _ => hit[3] = true, +// } +// } +// } + +// #[test] +// fn nesting() { +// let (s, r) = unbounded::<i32>(); + +// select! { +// send(s, 0) -> _ => { +// select! { +// recv(r) -> v => { +// assert_eq!(v, Ok(0)); +// select! { +// send(s, 1) -> _ => { +// select! { +// recv(r) -> v => { +// assert_eq!(v, Ok(1)); +// } +// } +// } +// } +// } +// } +// } +// } +// } + +// #[test] +// #[should_panic(expected = "send panicked")] +// fn panic_sender() { +// fn get() -> Sender<i32> { +// panic!("send panicked") +// } + +// #[allow(unreachable_code)] +// { +// select! { +// send(get(), panic!()) -> _ => {} +// } +// } +// } + +// #[test] +// #[should_panic(expected = "recv panicked")] +// fn panic_receiver() { +// fn get() -> Receiver<i32> { +// panic!("recv panicked") +// } + +// select! { +// recv(get()) -> _ => {} +// } +// } + +// #[test] +// fn stress_recv() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded(); +// let (s2, r2) = bounded(5); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// r3.recv().unwrap(); + +// s2.send(i).unwrap(); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// select! { +// recv(r1) -> v => assert_eq!(v, Ok(i)), +// recv(r2) -> v => assert_eq!(v, Ok(i)), +// } + +// s3.send(()).unwrap(); +// } +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_send() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// assert_eq!(r1.recv().unwrap(), i); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// select! { +// send(s1, i) -> _ => {}, +// send(s2, i) -> _ => {}, +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_mixed() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded(0); +// let (s2, r2) = bounded(0); +// let (s3, r3) = bounded(100); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// s1.send(i).unwrap(); +// assert_eq!(r2.recv().unwrap(), i); +// r3.recv().unwrap(); +// } +// }); + +// for i in 0..COUNT { +// for _ in 0..2 { +// select! { +// recv(r1) -> v => assert_eq!(v, Ok(i)), +// send(s2, i) -> _ => {}, +// } +// } +// s3.send(()).unwrap(); +// } +// }) +// .unwrap(); +// } + +// #[test] +// fn stress_timeout_two_threads() { +// const COUNT: usize = 20; + +// let (s, r) = bounded(2); + +// scope(|scope| { +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// loop { +// select! { +// send(s, i) -> _ => break, +// default(ms(100)) => {} +// } +// } +// } +// }); + +// scope.spawn(|_| { +// for i in 0..COUNT { +// if i % 2 == 0 { +// thread::sleep(ms(500)); +// } + +// loop { +// select! { +// recv(r) -> v => { +// assert_eq!(v, Ok(i)); +// break; +// } +// default(ms(100)) => {} +// } +// } +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send_recv_same_channel() { +// let (s, r) = bounded::<i32>(0); +// select! { +// send(s, 0) -> _ => panic!(), +// recv(r) -> _ => panic!(), +// default(ms(500)) => {} +// } + +// let (s, r) = unbounded::<i32>(); +// select! { +// send(s, 0) -> _ => {}, +// recv(r) -> _ => panic!(), +// default(ms(500)) => panic!(), +// } +// } + +// #[test] +// fn matching() { +// const THREADS: usize = 44; + +// let (s, r) = &bounded::<usize>(0); + +// scope(|scope| { +// for i in 0..THREADS { +// scope.spawn(move |_| { +// select! { +// recv(r) -> v => assert_ne!(v.unwrap(), i), +// send(s, i) -> _ => {}, +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn matching_with_leftover() { +// const THREADS: usize = 55; + +// let (s, r) = &bounded::<usize>(0); + +// scope(|scope| { +// for i in 0..THREADS { +// scope.spawn(move |_| { +// select! { +// recv(r) -> v => assert_ne!(v.unwrap(), i), +// send(s, i) -> _ => {}, +// } +// }); +// } +// s.send(!0).unwrap(); +// }) +// .unwrap(); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn channel_through_channel() { +// const COUNT: usize = 1000; + +// type T = Box<dyn Any + Send>; + +// for cap in 0..3 { +// let (s, r) = bounded::<T>(cap); + +// scope(|scope| { +// scope.spawn(move |_| { +// let mut s = s; + +// for _ in 0..COUNT { +// let (new_s, new_r) = bounded(cap); +// let new_r: T = Box::new(Some(new_r)); + +// select! { +// send(s, new_r) -> _ => {} +// } + +// s = new_s; +// } +// }); + +// scope.spawn(move |_| { +// let mut r = r; + +// for _ in 0..COUNT { +// r = select! { +// recv(r) -> msg => { +// msg.unwrap() +// .downcast_mut::<Option<Receiver<T>>>() +// .unwrap() +// .take() +// .unwrap() +// } +// } +// } +// }); +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn linearizable_default() { +// const COUNT: usize = 100_000; + +// for step in 0..2 { +// let (start_s, start_r) = bounded::<()>(0); +// let (end_s, end_r) = bounded::<()>(0); + +// let ((s1, r1), (s2, r2)) = if step == 0 { +// (bounded::<i32>(1), bounded::<i32>(1)) +// } else { +// (unbounded::<i32>(), unbounded::<i32>()) +// }; + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// start_s.send(()).unwrap(); + +// s1.send(1).unwrap(); +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// default => unreachable!() +// } + +// end_s.send(()).unwrap(); +// let _ = r2.try_recv(); +// } +// }); + +// for _ in 0..COUNT { +// start_r.recv().unwrap(); + +// s2.send(1).unwrap(); +// let _ = r1.try_recv(); + +// end_r.recv().unwrap(); +// } +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn linearizable_timeout() { +// const COUNT: usize = 100_000; + +// for step in 0..2 { +// let (start_s, start_r) = bounded::<()>(0); +// let (end_s, end_r) = bounded::<()>(0); + +// let ((s1, r1), (s2, r2)) = if step == 0 { +// (bounded::<i32>(1), bounded::<i32>(1)) +// } else { +// (unbounded::<i32>(), unbounded::<i32>()) +// }; + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..COUNT { +// start_s.send(()).unwrap(); + +// s1.send(1).unwrap(); +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// default(ms(0)) => unreachable!() +// } + +// end_s.send(()).unwrap(); +// let _ = r2.try_recv(); +// } +// }); + +// for _ in 0..COUNT { +// start_r.recv().unwrap(); + +// s2.send(1).unwrap(); +// let _ = r1.try_recv(); + +// end_r.recv().unwrap(); +// } +// }) +// .unwrap(); +// } +// } + +// #[test] +// fn fairness1() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let mut hits = [0usize; 4]; +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// recv(after(ms(0))) -> _ => hits[2] += 1, +// recv(tick(ms(0))) -> _ => hits[3] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } + +// #[test] +// fn fairness2() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = unbounded::<()>(); +// let (s2, r2) = bounded::<()>(1); +// let (s3, r3) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// let (hole, _r) = bounded(0); + +// for _ in 0..COUNT { +// let s1 = if s1.is_empty() { &s1 } else { &hole }; +// let s2 = if s2.is_empty() { &s2 } else { &hole }; + +// select! { +// send(s1, ()) -> res => assert!(res.is_ok()), +// send(s2, ()) -> res => assert!(res.is_ok()), +// send(s3, ()) -> res => assert!(res.is_ok()), +// } +// } +// }); + +// let hits = vec![Cell::new(0usize); 3]; +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0].set(hits[0].get() + 1), +// recv(r2) -> _ => hits[1].set(hits[1].get() + 1), +// recv(r3) -> _ => hits[2].set(hits[2].get() + 1), +// } +// } +// assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); +// }) +// .unwrap(); +// } + +// #[test] +// fn fairness_recv() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(COUNT); +// let (s2, r2) = unbounded::<()>(); + +// for _ in 0..COUNT { +// s1.send(()).unwrap(); +// s2.send(()).unwrap(); +// } + +// let mut hits = [0usize; 2]; +// while hits[0] + hits[1] < COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / 4)); +// } + +// #[test] +// fn fairness_send() { +// const COUNT: usize = 10_000; + +// let (s1, _r1) = bounded::<()>(COUNT); +// let (s2, _r2) = unbounded::<()>(); + +// let mut hits = [0usize; 2]; +// for _ in 0..COUNT { +// select! { +// send(s1, ()) -> _ => hits[0] += 1, +// send(s2, ()) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / 4)); +// } + +// #[test] +// fn references() { +// let (s, r) = unbounded::<i32>(); +// select! { +// send(s, 0) -> _ => {} +// recv(r) -> _ => {} +// } +// select! { +// send(&&&&s, 0) -> _ => {} +// recv(&&&&r) -> _ => {} +// } +// select! { +// recv(Some(&r).unwrap_or(&never())) -> _ => {}, +// default => {} +// } +// select! { +// recv(Some(r).unwrap_or(never())) -> _ => {}, +// default => {} +// } +// } + +// #[test] +// fn case_blocks() { +// let (s, r) = unbounded::<i32>(); + +// select! { +// recv(r) -> _ => 3.0, +// recv(r) -> _ => loop { +// unreachable!() +// }, +// recv(r) -> _ => match 7 + 3 { +// _ => unreachable!() +// }, +// default => 7. +// }; + +// select! { +// recv(r) -> msg => if msg.is_ok() { +// unreachable!() +// }, +// default => () +// } + +// drop(s); +// } + +// #[test] +// fn move_handles() { +// let (s, r) = unbounded::<i32>(); +// select! { +// recv((move || r)()) -> _ => {} +// send((move || s)(), 0) -> _ => {} +// } +// } + +// #[test] +// fn infer_types() { +// let (s, r) = unbounded(); +// select! { +// recv(r) -> _ => {} +// default => {} +// } +// s.send(()).unwrap(); + +// let (s, r) = unbounded(); +// select! { +// send(s, ()) -> _ => {} +// } +// r.recv().unwrap(); +// } + +// #[test] +// fn default_syntax() { +// let (s, r) = bounded::<i32>(0); + +// select! { +// recv(r) -> _ => panic!(), +// default => {} +// } +// select! { +// send(s, 0) -> _ => panic!(), +// default() => {} +// } +// select! { +// default => {} +// } +// select! { +// default() => {} +// } +// } + +// #[test] +// fn same_variable_name() { +// let (_, r) = unbounded::<i32>(); +// select! { +// recv(r) -> r => assert!(r.is_err()), +// } +// } + +// #[test] +// fn handles_on_heap() { +// let (s, r) = unbounded::<i32>(); +// let (s, r) = (Box::new(s), Box::new(r)); + +// select! { +// send(*s, 0) -> _ => {} +// recv(*r) -> _ => {} +// default => {} +// } + +// drop(s); +// drop(r); +// } + +// #[test] +// fn once_blocks() { +// let (s, r) = unbounded::<i32>(); + +// let once = Box::new(()); +// select! { +// send(s, 0) -> _ => drop(once), +// } + +// let once = Box::new(()); +// select! { +// recv(r) -> _ => drop(once), +// } + +// let once1 = Box::new(()); +// let once2 = Box::new(()); +// select! { +// send(s, 0) -> _ => drop(once1), +// default => drop(once2), +// } + +// let once1 = Box::new(()); +// let once2 = Box::new(()); +// select! { +// recv(r) -> _ => drop(once1), +// default => drop(once2), +// } + +// let once1 = Box::new(()); +// let once2 = Box::new(()); +// select! { +// recv(r) -> _ => drop(once1), +// send(s, 0) -> _ => drop(once2), +// } +// } + +// #[test] +// fn once_receiver() { +// let (_, r) = unbounded::<i32>(); + +// let once = Box::new(()); +// let get = move || { +// drop(once); +// r +// }; + +// select! { +// recv(get()) -> _ => {} +// } +// } + +// #[test] +// fn once_sender() { +// let (s, _) = unbounded::<i32>(); + +// let once = Box::new(()); +// let get = move || { +// drop(once); +// s +// }; + +// select! { +// send(get(), 5) -> _ => {} +// } +// } + +// #[test] +// fn parse_nesting() { +// let (_, r) = unbounded::<i32>(); + +// select! { +// recv(r) -> _ => {} +// recv(r) -> _ => { +// select! { +// recv(r) -> _ => {} +// recv(r) -> _ => { +// select! { +// recv(r) -> _ => {} +// recv(r) -> _ => { +// select! { +// default => {} +// } +// } +// } +// } +// } +// } +// } +// } + +// #[test] +// fn evaluate() { +// let (s, r) = unbounded::<i32>(); + +// let v = select! { +// recv(r) -> _ => "foo".into(), +// send(s, 0) -> _ => "bar".to_owned(), +// default => "baz".to_string(), +// }; +// assert_eq!(v, "bar"); + +// let v = select! { +// recv(r) -> _ => "foo".into(), +// default => "baz".to_string(), +// }; +// assert_eq!(v, "foo"); + +// let v = select! { +// recv(r) -> _ => "foo".into(), +// default => "baz".to_string(), +// }; +// assert_eq!(v, "baz"); +// } + +// #[test] +// fn deref() { +// use crossbeam_channel as cc; + +// struct Sender<T>(cc::Sender<T>); +// struct Receiver<T>(cc::Receiver<T>); + +// impl<T> Deref for Receiver<T> { +// type Target = cc::Receiver<T>; + +// fn deref(&self) -> &Self::Target { +// &self.0 +// } +// } + +// impl<T> Deref for Sender<T> { +// type Target = cc::Sender<T>; + +// fn deref(&self) -> &Self::Target { +// &self.0 +// } +// } + +// let (s, r) = bounded::<i32>(0); +// let (s, r) = (Sender(s), Receiver(r)); + +// select! { +// send(s, 0) -> _ => panic!(), +// recv(r) -> _ => panic!(), +// default => {} +// } +// } + +// #[test] +// fn result_types() { +// let (s, _) = bounded::<i32>(0); +// let (_, r) = bounded::<i32>(0); + +// select! { +// recv(r) -> res => drop::<Result<i32, RecvError>>(res), +// } +// select! { +// recv(r) -> res => drop::<Result<i32, RecvError>>(res), +// default => {} +// } +// select! { +// recv(r) -> res => drop::<Result<i32, RecvError>>(res), +// default(ms(0)) => {} +// } + +// select! { +// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), +// } +// select! { +// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), +// default => {} +// } +// select! { +// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), +// default(ms(0)) => {} +// } + +// select! { +// send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), +// recv(r) -> res => drop::<Result<i32, RecvError>>(res), +// } +// } + +// #[test] +// fn try_recv() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// recv(r) -> _ => panic!(), +// default => {} +// } +// thread::sleep(ms(1500)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(7)), +// default => panic!(), +// } +// thread::sleep(ms(500)); +// select! { +// recv(r) -> v => assert_eq!(v, Err(RecvError)), +// default => panic!(), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1000)); +// select! { +// send(s, 7) -> res => res.unwrap(), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn recv() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// recv(r) -> v => assert_eq!(v, Ok(7)), +// } +// thread::sleep(ms(1000)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(8)), +// } +// thread::sleep(ms(1000)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(9)), +// } +// select! { +// recv(r) -> v => assert_eq!(v, Err(RecvError)), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1500)); +// select! { +// send(s, 7) -> res => res.unwrap(), +// } +// select! { +// send(s, 8) -> res => res.unwrap(), +// } +// select! { +// send(s, 9) -> res => res.unwrap(), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn recv_timeout() { +// let (s, r) = bounded::<i32>(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// recv(r) -> _ => panic!(), +// default(ms(1000)) => {} +// } +// select! { +// recv(r) -> v => assert_eq!(v, Ok(7)), +// default(ms(1000)) => panic!(), +// } +// select! { +// recv(r) -> v => assert_eq!(v, Err(RecvError)), +// default(ms(1000)) => panic!(), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1500)); +// select! { +// send(s, 7) -> res => res.unwrap(), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn try_send() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// send(s, 7) -> _ => panic!(), +// default => {} +// } +// thread::sleep(ms(1500)); +// select! { +// send(s, 8) -> res => res.unwrap(), +// default => panic!(), +// } +// thread::sleep(ms(500)); +// select! { +// send(s, 8) -> res => assert_eq!(res, Err(SendError(8))), +// default => panic!(), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1000)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(8)), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// send(s, 7) -> res => res.unwrap(), +// } +// thread::sleep(ms(1000)); +// select! { +// send(s, 8) -> res => res.unwrap(), +// } +// thread::sleep(ms(1000)); +// select! { +// send(s, 9) -> res => res.unwrap(), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1500)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(7)), +// } +// select! { +// recv(r) -> v => assert_eq!(v, Ok(8)), +// } +// select! { +// recv(r) -> v => assert_eq!(v, Ok(9)), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn send_timeout() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// send(s, 7) -> _ => panic!(), +// default(ms(1000)) => {} +// } +// select! { +// send(s, 8) -> res => res.unwrap(), +// default(ms(1000)) => panic!(), +// } +// select! { +// send(s, 9) -> res => assert_eq!(res, Err(SendError(9))), +// default(ms(1000)) => panic!(), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1500)); +// select! { +// recv(r) -> v => assert_eq!(v, Ok(8)), +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn disconnect_wakes_sender() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// send(s, ()) -> res => assert_eq!(res, Err(SendError(()))), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1000)); +// drop(r); +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn disconnect_wakes_receiver() { +// let (s, r) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(move |_| { +// select! { +// recv(r) -> res => assert_eq!(res, Err(RecvError)), +// } +// }); +// scope.spawn(move |_| { +// thread::sleep(ms(1000)); +// drop(s); +// }); +// }) +// .unwrap(); +// } diff --git a/vendor/flume/tests/stream.rs b/vendor/flume/tests/stream.rs new file mode 100644 index 0000000..e3b32cd --- /dev/null +++ b/vendor/flume/tests/stream.rs @@ -0,0 +1,255 @@ +#[cfg(feature = "async")] +use { + flume::*, + futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}, + async_std::prelude::FutureExt, + std::time::Duration, +}; +use futures::{stream, Stream}; + +#[cfg(feature = "async")] +#[test] +fn stream_recv() { + let (tx, rx) = unbounded(); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + tx.send(42u32).unwrap(); + println!("sent"); + }); + + async_std::task::block_on(async { + println!("receiving..."); + let x = rx.stream().next().await; + println!("received"); + assert_eq!(x, Some(42)); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn stream_recv_disconnect() { + let (tx, rx) = bounded::<i32>(0); + + let t = std::thread::spawn(move || { + tx.send(42); + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(tx) + }); + + async_std::task::block_on(async { + let mut stream = rx.into_stream(); + assert_eq!(stream.next().await, Some(42)); + assert_eq!(stream.next().await, None); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn stream_recv_drop_recv() { + let (tx, rx) = bounded::<i32>(10); + + let rx2 = rx.clone(); + let mut stream = rx.into_stream(); + + async_std::task::block_on(async { + let res = async_std::future::timeout( + std::time::Duration::from_millis(500), + stream.next() + ).await; + + assert!(res.is_err()); + }); + + let t = std::thread::spawn(move || { + async_std::task::block_on(async { + rx2.stream().next().await + }) + }); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + tx.send(42).unwrap(); + + drop(stream); + + assert_eq!(t.join().unwrap(), Some(42)) +} + +#[cfg(feature = "async")] +#[test] +fn r#stream_drop_send_disconnect() { + let (tx, rx) = bounded::<i32>(1); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(tx); + }); + + async_std::task::block_on(async { + let mut stream = rx.into_stream(); + assert_eq!(stream.next().await, None); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn stream_send_1_million_no_drop_or_reorder() { + #[derive(Debug)] + enum Message { + Increment { + old: u64, + }, + ReturnCount, + } + + let (tx, rx) = unbounded(); + + let t = async_std::task::spawn(async move { + let mut count = 0u64; + let mut stream = rx.into_stream(); + + while let Some(Message::Increment { old }) = stream.next().await { + assert_eq!(old, count); + count += 1; + } + + count + }); + + for next in 0..1_000_000 { + tx.send(Message::Increment { old: next }).unwrap(); + } + + tx.send(Message::ReturnCount).unwrap(); + + let count = t.await; + assert_eq!(count, 1_000_000) +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn parallel_streams_and_async_recv() { + let (tx, rx) = flume::unbounded(); + let rx = ℞ + let send_fut = async move { + let n_sends: usize = 100000; + for _ in 0..n_sends { + tx.send_async(()).await.unwrap(); + } + }; + + async_std::task::spawn( + send_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Send timed out!")) + ); + + let mut futures_unordered = (0..250) + .map(|n| async move { + if n % 2 == 0 { + let mut stream = rx.stream(); + while let Some(()) = stream.next().await {} + } else { + while let Ok(()) = rx.recv_async().await {} + } + + }) + .collect::<FuturesUnordered<_>>(); + + let recv_fut = async { + while futures_unordered.next().await.is_some() {} + }; + + recv_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Receive timed out!")) + .await + .unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn stream_no_double_wake() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::pin::Pin; + use std::task::Context; + use futures::task::{waker, ArcWake}; + use futures::Stream; + + let count = Arc::new(AtomicUsize::new(0)); + + // all this waker does is count how many times it is called + struct CounterWaker { + count: Arc<AtomicUsize>, + } + + impl ArcWake for CounterWaker { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.count.fetch_add(1, Ordering::SeqCst); + } + } + + // create waker and context + let w = CounterWaker { + count: count.clone(), + }; + let w = waker(Arc::new(w)); + let cx = &mut Context::from_waker(&w); + + // create unbounded channel + let (tx, rx) = unbounded::<()>(); + let mut stream = rx.stream(); + + // register waker with stream + let _ = Pin::new(&mut stream).poll_next(cx); + + // send multiple items + tx.send(()).unwrap(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + + // verify that stream is only woken up once. + assert_eq!(count.load(Ordering::SeqCst), 1); +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55 + fn dummy_stream() -> impl Stream<Item = usize> { + stream::unfold(0, |count| async move { + if count < 1000 { + Some((count, count + 1)) + } else { + None + } + }) + } + + let (send_task, recv_task) = { + use futures::SinkExt; + let (tx, rx) = flume::bounded(100); + + let send_task = dummy_stream() + .map(|i| Ok(i)) + .forward(tx.into_sink().sink_map_err(|e| { + panic!("send error:{:#?}", e) + })); + + let recv_task = rx + .into_stream() + .for_each(|item| async move {}); + (send_task, recv_task) + }; + + let jh = async_std::task::spawn(send_task); + async_std::task::block_on(recv_task); + jh.await.unwrap(); +} diff --git a/vendor/flume/tests/thread_locals.rs b/vendor/flume/tests/thread_locals.rs new file mode 100644 index 0000000..acde751 --- /dev/null +++ b/vendor/flume/tests/thread_locals.rs @@ -0,0 +1,53 @@ +// //! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics. + +// extern crate crossbeam_utils; + +// use std::thread; +// use std::time::Duration; + +// use flume::unbounded; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// #[cfg_attr(target_os = "macos", ignore = "TLS is destroyed too early on macOS")] +// fn use_while_exiting() { +// struct Foo; + +// impl Drop for Foo { +// fn drop(&mut self) { +// // A blocking operation after the thread-locals have been dropped. This will attempt to +// // use the thread-locals and must not panic. +// let (_s, r) = unbounded::<()>(); +// select! { +// recv(r) -> _ => {} +// default(ms(100)) => {} +// } +// } +// } + +// thread_local! { +// static FOO: Foo = Foo; +// } + +// let (s, r) = unbounded::<()>(); + +// scope(|scope| { +// scope.spawn(|_| { +// // First initialize `FOO`, then the thread-locals related to crossbeam-channel. +// FOO.with(|_| ()); +// r.recv().unwrap(); +// // At thread exit, thread-locals related to crossbeam-channel get dropped first and +// // `FOO` is dropped last. +// }); + +// scope.spawn(|_| { +// thread::sleep(ms(100)); +// s.send(()).unwrap(); +// }); +// }) +// .unwrap(); +// } diff --git a/vendor/flume/tests/tick.rs b/vendor/flume/tests/tick.rs new file mode 100644 index 0000000..b0fcd44 --- /dev/null +++ b/vendor/flume/tests/tick.rs @@ -0,0 +1,353 @@ +// //! Tests for the tick channel flavor. + +// #[macro_use] +// extern crate crossbeam_channel; +// extern crate crossbeam_utils; +// extern crate rand; + +// use std::sync::atomic::AtomicUsize; +// use std::sync::atomic::Ordering; +// use std::thread; +// use std::time::{Duration, Instant}; + +// use crossbeam_channel::{after, tick, Select, TryRecvError}; +// use crossbeam_utils::thread::scope; + +// fn ms(ms: u64) -> Duration { +// Duration::from_millis(ms) +// } + +// #[test] +// fn fire() { +// let start = Instant::now(); +// let r = tick(ms(50)); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// thread::sleep(ms(100)); + +// let fired = r.try_recv().unwrap(); +// assert!(start < fired); +// assert!(fired - start >= ms(50)); + +// let now = Instant::now(); +// assert!(fired < now); +// assert!(now - fired >= ms(50)); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + +// select! { +// recv(r) -> _ => panic!(), +// default => {} +// } + +// select! { +// recv(r) -> _ => {} +// recv(tick(ms(200))) -> _ => panic!(), +// } +// } + +// #[test] +// fn intervals() { +// let start = Instant::now(); +// let r = tick(ms(50)); + +// let t1 = r.recv().unwrap(); +// assert!(start + ms(50) <= t1); +// assert!(start + ms(100) > t1); + +// thread::sleep(ms(300)); +// let t2 = r.try_recv().unwrap(); +// assert!(start + ms(100) <= t2); +// assert!(start + ms(150) > t2); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// let t3 = r.recv().unwrap(); +// assert!(start + ms(400) <= t3); +// assert!(start + ms(450) > t3); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn capacity() { +// const COUNT: usize = 10; + +// for i in 0..COUNT { +// let r = tick(ms(i as u64)); +// assert_eq!(r.capacity(), Some(1)); +// } +// } + +// #[test] +// fn len_empty_full() { +// let r = tick(ms(50)); + +// assert_eq!(r.len(), 0); +// assert_eq!(r.is_empty(), true); +// assert_eq!(r.is_full(), false); + +// thread::sleep(ms(100)); + +// assert_eq!(r.len(), 1); +// assert_eq!(r.is_empty(), false); +// assert_eq!(r.is_full(), true); + +// r.try_recv().unwrap(); + +// assert_eq!(r.len(), 0); +// assert_eq!(r.is_empty(), true); +// assert_eq!(r.is_full(), false); +// } + +// #[test] +// fn try_recv() { +// let r = tick(ms(200)); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(100)); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(200)); +// assert!(r.try_recv().is_ok()); +// assert!(r.try_recv().is_err()); + +// thread::sleep(ms(200)); +// assert!(r.try_recv().is_ok()); +// assert!(r.try_recv().is_err()); +// } + +// #[test] +// fn recv() { +// let start = Instant::now(); +// let r = tick(ms(50)); + +// let fired = r.recv().unwrap(); +// assert!(start < fired); +// assert!(fired - start >= ms(50)); + +// let now = Instant::now(); +// assert!(fired < now); +// assert!(now - fired < fired - start); + +// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +// } + +// #[test] +// fn recv_timeout() { +// let start = Instant::now(); +// let r = tick(ms(200)); + +// assert!(r.recv_timeout(ms(100)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(100)); +// assert!(now - start <= ms(150)); + +// let fired = r.recv_timeout(ms(200)).unwrap(); +// assert!(fired - start >= ms(200)); +// assert!(fired - start <= ms(250)); + +// assert!(r.recv_timeout(ms(100)).is_err()); +// let now = Instant::now(); +// assert!(now - start >= ms(300)); +// assert!(now - start <= ms(350)); + +// let fired = r.recv_timeout(ms(200)).unwrap(); +// assert!(fired - start >= ms(400)); +// assert!(fired - start <= ms(450)); +// } + +// #[test] +// fn recv_two() { +// let r1 = tick(ms(50)); +// let r2 = tick(ms(50)); + +// scope(|scope| { +// scope.spawn(|_| { +// for _ in 0..10 { +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// } +// } +// }); +// scope.spawn(|_| { +// for _ in 0..10 { +// select! { +// recv(r1) -> _ => {} +// recv(r2) -> _ => {} +// } +// } +// }); +// }) +// .unwrap(); +// } + +// #[test] +// fn recv_race() { +// select! { +// recv(tick(ms(50))) -> _ => {} +// recv(tick(ms(100))) -> _ => panic!(), +// } + +// select! { +// recv(tick(ms(100))) -> _ => panic!(), +// recv(tick(ms(50))) -> _ => {} +// } +// } + +// #[test] +// fn stress_default() { +// const COUNT: usize = 10; + +// for _ in 0..COUNT { +// select! { +// recv(tick(ms(0))) -> _ => {} +// default => panic!(), +// } +// } + +// for _ in 0..COUNT { +// select! { +// recv(tick(ms(100))) -> _ => panic!(), +// default => {} +// } +// } +// } + +// #[test] +// fn select() { +// const THREADS: usize = 4; + +// let hits = AtomicUsize::new(0); +// let r1 = tick(ms(200)); +// let r2 = tick(ms(300)); + +// scope(|scope| { +// for _ in 0..THREADS { +// scope.spawn(|_| { +// let timeout = after(ms(1100)); +// loop { +// let mut sel = Select::new(); +// let oper1 = sel.recv(&r1); +// let oper2 = sel.recv(&r2); +// let oper3 = sel.recv(&timeout); +// let oper = sel.select(); +// match oper.index() { +// i if i == oper1 => { +// oper.recv(&r1).unwrap(); +// hits.fetch_add(1, Ordering::SeqCst); +// } +// i if i == oper2 => { +// oper.recv(&r2).unwrap(); +// hits.fetch_add(1, Ordering::SeqCst); +// } +// i if i == oper3 => { +// oper.recv(&timeout).unwrap(); +// break; +// } +// _ => unreachable!(), +// } +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(hits.load(Ordering::SeqCst), 8); +// } + +// #[test] +// fn ready() { +// const THREADS: usize = 4; + +// let hits = AtomicUsize::new(0); +// let r1 = tick(ms(200)); +// let r2 = tick(ms(300)); + +// scope(|scope| { +// for _ in 0..THREADS { +// scope.spawn(|_| { +// let timeout = after(ms(1100)); +// 'outer: loop { +// let mut sel = Select::new(); +// sel.recv(&r1); +// sel.recv(&r2); +// sel.recv(&timeout); +// loop { +// match sel.ready() { +// 0 => { +// if r1.try_recv().is_ok() { +// hits.fetch_add(1, Ordering::SeqCst); +// break; +// } +// } +// 1 => { +// if r2.try_recv().is_ok() { +// hits.fetch_add(1, Ordering::SeqCst); +// break; +// } +// } +// 2 => { +// if timeout.try_recv().is_ok() { +// break 'outer; +// } +// } +// _ => unreachable!(), +// } +// } +// } +// }); +// } +// }) +// .unwrap(); + +// assert_eq!(hits.load(Ordering::SeqCst), 8); +// } + +// #[test] +// fn fairness() { +// const COUNT: usize = 30; + +// for &dur in &[0, 1] { +// let mut hits = [0usize; 2]; + +// for _ in 0..COUNT { +// let r1 = tick(ms(dur)); +// let r2 = tick(ms(dur)); + +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// } +// } +// } + +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } +// } + +// #[test] +// fn fairness_duplicates() { +// const COUNT: usize = 30; + +// for &dur in &[0, 1] { +// let mut hits = [0usize; 5]; + +// for _ in 0..COUNT { +// let r = tick(ms(dur)); + +// for _ in 0..COUNT { +// select! { +// recv(r) -> _ => hits[0] += 1, +// recv(r) -> _ => hits[1] += 1, +// recv(r) -> _ => hits[2] += 1, +// recv(r) -> _ => hits[3] += 1, +// recv(r) -> _ => hits[4] += 1, +// } +// } +// } + +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// } +// } diff --git a/vendor/flume/tests/zero.rs b/vendor/flume/tests/zero.rs new file mode 100644 index 0000000..be4239d --- /dev/null +++ b/vendor/flume/tests/zero.rs @@ -0,0 +1,557 @@ +//! Tests for the zero channel flavor. + +extern crate crossbeam_utils; +extern crate rand; + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use flume::{bounded, Receiver}; +use flume::{RecvError, RecvTimeoutError, TryRecvError}; +use flume::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = bounded(0); + assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn capacity() { + let (s, r) = bounded::<()>(0); + assert_eq!(s.capacity(), Some(0)); + assert_eq!(r.capacity(), Some(0)); +} + +#[test] +fn len_empty_full() { + let (s, r) = bounded(0); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), true); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), true); + + scope(|scope| { + scope.spawn(|_| s.send(0).unwrap()); + scope.spawn(|_| r.recv().unwrap()); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), true); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), true); +} + +#[test] +fn try_recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); + thread::sleep(ms(1500)); + assert_eq!(s.try_send(8), Ok(())); + thread::sleep(ms(500)); + assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + }); + }) + .unwrap(); +} + +#[test] +fn send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + s.send(7).unwrap(); + thread::sleep(ms(1000)); + s.send(8).unwrap(); + thread::sleep(ms(1000)); + s.send(9).unwrap(); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(7)); + assert_eq!(r.recv(), Ok(8)); + assert_eq!(r.recv(), Ok(9)); + }); + }) + .unwrap(); +} + +#[test] +fn send_timeout() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!( + s.send_timeout(7, ms(1000)), + Err(SendTimeoutError::Timeout(7)) + ); + assert_eq!(s.send_timeout(8, ms(1000)), Ok(())); + assert_eq!( + s.send_timeout(9, ms(1000)), + Err(SendTimeoutError::Disconnected(9)) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(8)); + }); + }) + .unwrap(); +} + +#[test] +fn len() { + const COUNT: usize = 25_000; + + let (s, r) = bounded(0); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + assert_eq!(r.len(), 0); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + assert_eq!(s.len(), 0); + } + }); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_sender() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send(()), Err(SendError(()))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(r); + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(move |_| { + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert!(r.recv().is_err()); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(0); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + const COUNT: usize = 1000; + + let (request_s, request_r) = bounded(0); + let (response_s, response_r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + let _ = request_s.try_send(()); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(()) = s.send_timeout(i, ms(10)) { + break; + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..100 { + let steps = rng.gen_range(0..3_000); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = bounded::<DropCounter>(0); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + } +} + +// #[test] +// fn fairness() { +// const COUNT: usize = 10_000; + +// let (s1, r1) = bounded::<()>(0); +// let (s2, r2) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// let mut hits = [0usize; 2]; +// for _ in 0..COUNT { +// select! { +// recv(r1) -> _ => hits[0] += 1, +// recv(r2) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// }); + +// let mut hits = [0usize; 2]; +// for _ in 0..COUNT { +// select! { +// send(s1, ()) -> _ => hits[0] += 1, +// send(s2, ()) -> _ => hits[1] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// }) +// .unwrap(); +// } + +// #[test] +// fn fairness_duplicates() { +// const COUNT: usize = 10_000; + +// let (s, r) = bounded::<()>(0); + +// scope(|scope| { +// scope.spawn(|_| { +// let mut hits = [0usize; 5]; +// for _ in 0..COUNT { +// select! { +// recv(r) -> _ => hits[0] += 1, +// recv(r) -> _ => hits[1] += 1, +// recv(r) -> _ => hits[2] += 1, +// recv(r) -> _ => hits[3] += 1, +// recv(r) -> _ => hits[4] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// }); + +// let mut hits = [0usize; 5]; +// for _ in 0..COUNT { +// select! { +// send(s, ()) -> _ => hits[0] += 1, +// send(s, ()) -> _ => hits[1] += 1, +// send(s, ()) -> _ => hits[2] += 1, +// send(s, ()) -> _ => hits[3] += 1, +// send(s, ()) -> _ => hits[4] += 1, +// } +// } +// assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +// }) +// .unwrap(); +// } + +// #[test] +// fn recv_in_send() { +// let (s, r) = bounded(0); + +// scope(|scope| { +// scope.spawn(|_| { +// thread::sleep(ms(100)); +// r.recv() +// }); + +// scope.spawn(|_| { +// thread::sleep(ms(500)); +// s.send(()).unwrap(); +// }); + +// select! { +// send(s, r.recv().unwrap()) -> _ => {} +// } +// }) +// .unwrap(); +// } + +#[test] +fn channel_through_channel() { + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = bounded::<T>(0); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(0); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} |