diff options
Diffstat (limited to 'vendor/flume/tests/basic.rs')
-rw-r--r-- | vendor/flume/tests/basic.rs | 428 |
1 files changed, 0 insertions, 428 deletions
diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs deleted file mode 100644 index b937436..0000000 --- a/vendor/flume/tests/basic.rs +++ /dev/null @@ -1,428 +0,0 @@ -use std::time::{Instant, Duration}; -use flume::*; - -#[test] -fn send_recv() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); } - assert!(rx.try_recv().is_err()); -} - -#[test] -fn iter() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - drop(tx); - assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); -} - -#[test] -fn try_iter() { - let (tx, rx) = unbounded(); - for i in 0..1000 { tx.send(i).unwrap(); } - assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum()); -} - -#[test] -fn iter_threaded() { - let (tx, rx) = unbounded(); - for i in 0..1000 { - let tx = tx.clone(); - std::thread::spawn(move || tx.send(i).unwrap()); - } - drop(tx); - assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn send_timeout() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = bounded(1); - - assert!(tx.send_timeout(42, dur).is_ok()); - - let then = Instant::now(); - assert!(tx.send_timeout(43, dur).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - assert_eq!(rx.drain().count(), 1); - - drop(rx); - - assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err()); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn recv_timeout() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = unbounded(); - let then = Instant::now(); - assert!(rx.recv_timeout(dur).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - tx.send(42).unwrap(); - assert_eq!(rx.recv_timeout(dur), Ok(42)); - assert!(Instant::now().duration_since(now) < max_error); -} - -#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 -#[test] -fn recv_deadline() { - let dur = Duration::from_millis(350); - let max_error = Duration::from_millis(5); - let dur_min = dur.checked_sub(max_error).unwrap(); - let dur_max = dur.checked_add(max_error).unwrap(); - - let (tx, rx) = unbounded(); - let then = Instant::now(); - assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err()); - let now = Instant::now(); - - let this = now.duration_since(then); - if !(dur_min < this && this < dur_max) { - panic!("timeout exceeded: {:?}", this); - } - - tx.send(42).unwrap(); - assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42)); - assert!(Instant::now().duration_since(now) < max_error); -} - -#[test] -fn recv_timeout_missed_send() { - let (tx, rx) = bounded(10); - - assert!(rx.recv_timeout(Duration::from_millis(100)).is_err()); - - tx.send(42).unwrap(); - - assert_eq!(rx.recv(), Ok(42)); -} - -#[test] -fn disconnect_tx() { - let (tx, rx) = unbounded::<()>(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn disconnect_rx() { - let (tx, rx) = unbounded(); - drop(rx); - assert!(tx.send(0).is_err()); -} - -#[test] -fn drain() { - let (tx, rx) = unbounded(); - - for i in 0..100 { - tx.send(i).unwrap(); - } - - assert_eq!(rx.drain().sum::<u32>(), (0..100).sum()); - - for i in 0..100 { - tx.send(i).unwrap(); - } - - for i in 0..100 { - tx.send(i).unwrap(); - } - - rx.recv().unwrap(); - - (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r)); -} - -#[test] -fn try_send() { - let (tx, rx) = bounded(5); - - for i in 0..5 { - tx.try_send(i).unwrap(); - } - - assert!(tx.try_send(42).is_err()); - - assert_eq!(rx.recv(), Ok(0)); - - assert_eq!(tx.try_send(42), Ok(())); - - assert_eq!(rx.recv(), Ok(1)); - drop(rx); - - assert!(tx.try_send(42).is_err()); -} - -#[test] -fn send_bounded() { - let (tx, rx) = bounded(5); - - for _ in 0..5 { - tx.send(42).unwrap(); - } - - let _ = rx.recv().unwrap(); - - tx.send(42).unwrap(); - - assert!(tx.try_send(42).is_err()); - - rx.drain(); - - let mut ts = Vec::new(); - for _ in 0..100 { - let tx = tx.clone(); - ts.push(std::thread::spawn(move || { - for i in 0..10000 { - tx.send(i).unwrap(); - } - })); - } - - drop(tx); - - assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100); - - for t in ts { - t.join().unwrap(); - } - - assert!(rx.recv().is_err()); -} - -#[test] -fn rendezvous() { - let (tx, rx) = bounded(0); - - for i in 0..5 { - let tx = tx.clone(); - let t = std::thread::spawn(move || { - assert!(tx.try_send(()).is_err()); - - let then = Instant::now(); - tx.send(()).unwrap(); - let now = Instant::now(); - - assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i); - }); - - std::thread::sleep(Duration::from_millis(1000)); - rx.recv().unwrap(); - - t.join().unwrap(); - } -} - -#[test] -fn hydra() { - let thread_num = 32; - let msg_num = 1000; - - let (main_tx, main_rx) = unbounded::<()>(); - - let mut txs = Vec::new(); - for _ in 0..thread_num { - let main_tx = main_tx.clone(); - let (tx, rx) = unbounded(); - txs.push(tx); - - std::thread::spawn(move || { - for msg in rx.iter() { - main_tx.send(msg).unwrap(); - } - }); - } - - drop(main_tx); - - for _ in 0..10 { - for tx in &txs { - for _ in 0..msg_num { - tx.send(Default::default()).unwrap(); - } - } - - for _ in 0..thread_num { - for _ in 0..msg_num { - main_rx.recv().unwrap(); - } - } - } - - drop(txs); - assert!(main_rx.recv().is_err()); -} - -#[test] -fn robin() { - let thread_num = 32; - let msg_num = 10; - - let (mut main_tx, main_rx) = bounded::<()>(1); - - for _ in 0..thread_num { - let (mut tx, rx) = bounded(100); - std::mem::swap(&mut tx, &mut main_tx); - - std::thread::spawn(move || { - for msg in rx.iter() { - tx.send(msg).unwrap(); - } - }); - } - - for _ in 0..10 { - let main_tx = main_tx.clone(); - std::thread::spawn(move || { - for _ in 0..msg_num { - main_tx.send(Default::default()).unwrap(); - } - }); - - for _ in 0..msg_num { - main_rx.recv().unwrap(); - } - } -} - -#[cfg(feature = "select")] -#[test] -fn select_general() { - #[derive(Debug, PartialEq)] - struct Foo(usize); - - let (tx0, rx0) = bounded(1); - let (tx1, rx1) = unbounded(); - - for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() { - std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - let _ = t.send(Foo(i)); - }); - } - - let x = Selector::new() - .recv(&rx0, |x| x) - .recv(&rx1, |x| x) - .wait() - .unwrap(); - - if x == Foo(0) { - assert!(rx1.recv().unwrap() == Foo(1)); - } else { - assert!(rx0.recv().unwrap() == Foo(0)); - } - - tx0.send(Foo(42)).unwrap(); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(100)); - assert_eq!(rx0.recv().unwrap(), Foo(42)); - assert_eq!(rx0.recv().unwrap(), Foo(43)); - - }); - - Selector::new() - .send(&tx0, Foo(43), |x| x) - .wait() - .unwrap(); - - t.join().unwrap(); -} - -struct MessageWithoutDebug(u32); - -#[test] -// This is a 'does it build' test, to make sure that the error types can turn -// into a std::error::Error without requiring the payload (which is not used -// there) to impl Debug. -fn std_error_without_debug() { - let (tx, rx) = unbounded::<MessageWithoutDebug>(); - - match tx.send(MessageWithoutDebug(1)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.recv() { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match tx.try_send(MessageWithoutDebug(2)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.try_recv() { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } - - match rx.recv_timeout(Duration::from_secs(10000000)) { - Ok(_) => {} - Err(e) => { - let _std_err: &dyn std::error::Error = &e; - } - } -} - -#[test] -fn weak_close() { - let (tx, rx) = unbounded::<()>(); - let weak = tx.downgrade(); - drop(tx); - assert!(weak.upgrade().is_none()); - assert!(rx.is_disconnected()); - assert!(rx.try_recv().is_err()); -} - -#[test] -fn weak_upgrade() { - let (tx, rx) = unbounded(); - let weak = tx.downgrade(); - let tx2 = weak.upgrade().unwrap(); - drop(tx); - assert!(!rx.is_disconnected()); - tx2.send(()).unwrap(); - assert!(rx.try_recv().is_ok()); -} |