diff options
Diffstat (limited to 'vendor/flume/tests/basic.rs')
-rw-r--r-- | vendor/flume/tests/basic.rs | 428 |
1 files changed, 428 insertions, 0 deletions
diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs new file mode 100644 index 0000000..b937436 --- /dev/null +++ b/vendor/flume/tests/basic.rs @@ -0,0 +1,428 @@ +use std::time::{Instant, Duration}; +use flume::*; + +#[test] +fn send_recv() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); } + assert!(rx.try_recv().is_err()); +} + +#[test] +fn iter() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + drop(tx); + assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); +} + +#[test] +fn try_iter() { + let (tx, rx) = unbounded(); + for i in 0..1000 { tx.send(i).unwrap(); } + assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum()); +} + +#[test] +fn iter_threaded() { + let (tx, rx) = unbounded(); + for i in 0..1000 { + let tx = tx.clone(); + std::thread::spawn(move || tx.send(i).unwrap()); + } + drop(tx); + assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum()); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn send_timeout() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = bounded(1); + + assert!(tx.send_timeout(42, dur).is_ok()); + + let then = Instant::now(); + assert!(tx.send_timeout(43, dur).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + assert_eq!(rx.drain().count(), 1); + + drop(rx); + + assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err()); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn recv_timeout() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = unbounded(); + let then = Instant::now(); + assert!(rx.recv_timeout(dur).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + tx.send(42).unwrap(); + assert_eq!(rx.recv_timeout(dur), Ok(42)); + assert!(Instant::now().duration_since(now) < max_error); +} + +#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41 +#[test] +fn recv_deadline() { + let dur = Duration::from_millis(350); + let max_error = Duration::from_millis(5); + let dur_min = dur.checked_sub(max_error).unwrap(); + let dur_max = dur.checked_add(max_error).unwrap(); + + let (tx, rx) = unbounded(); + let then = Instant::now(); + assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err()); + let now = Instant::now(); + + let this = now.duration_since(then); + if !(dur_min < this && this < dur_max) { + panic!("timeout exceeded: {:?}", this); + } + + tx.send(42).unwrap(); + assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42)); + assert!(Instant::now().duration_since(now) < max_error); +} + +#[test] +fn recv_timeout_missed_send() { + let (tx, rx) = bounded(10); + + assert!(rx.recv_timeout(Duration::from_millis(100)).is_err()); + + tx.send(42).unwrap(); + + assert_eq!(rx.recv(), Ok(42)); +} + +#[test] +fn disconnect_tx() { + let (tx, rx) = unbounded::<()>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn disconnect_rx() { + let (tx, rx) = unbounded(); + drop(rx); + assert!(tx.send(0).is_err()); +} + +#[test] +fn drain() { + let (tx, rx) = unbounded(); + + for i in 0..100 { + tx.send(i).unwrap(); + } + + assert_eq!(rx.drain().sum::<u32>(), (0..100).sum()); + + for i in 0..100 { + tx.send(i).unwrap(); + } + + for i in 0..100 { + tx.send(i).unwrap(); + } + + rx.recv().unwrap(); + + (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r)); +} + +#[test] +fn try_send() { + let (tx, rx) = bounded(5); + + for i in 0..5 { + tx.try_send(i).unwrap(); + } + + assert!(tx.try_send(42).is_err()); + + assert_eq!(rx.recv(), Ok(0)); + + assert_eq!(tx.try_send(42), Ok(())); + + assert_eq!(rx.recv(), Ok(1)); + drop(rx); + + assert!(tx.try_send(42).is_err()); +} + +#[test] +fn send_bounded() { + let (tx, rx) = bounded(5); + + for _ in 0..5 { + tx.send(42).unwrap(); + } + + let _ = rx.recv().unwrap(); + + tx.send(42).unwrap(); + + assert!(tx.try_send(42).is_err()); + + rx.drain(); + + let mut ts = Vec::new(); + for _ in 0..100 { + let tx = tx.clone(); + ts.push(std::thread::spawn(move || { + for i in 0..10000 { + tx.send(i).unwrap(); + } + })); + } + + drop(tx); + + assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100); + + for t in ts { + t.join().unwrap(); + } + + assert!(rx.recv().is_err()); +} + +#[test] +fn rendezvous() { + let (tx, rx) = bounded(0); + + for i in 0..5 { + let tx = tx.clone(); + let t = std::thread::spawn(move || { + assert!(tx.try_send(()).is_err()); + + let then = Instant::now(); + tx.send(()).unwrap(); + let now = Instant::now(); + + assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i); + }); + + std::thread::sleep(Duration::from_millis(1000)); + rx.recv().unwrap(); + + t.join().unwrap(); + } +} + +#[test] +fn hydra() { + let thread_num = 32; + let msg_num = 1000; + + let (main_tx, main_rx) = unbounded::<()>(); + + let mut txs = Vec::new(); + for _ in 0..thread_num { + let main_tx = main_tx.clone(); + let (tx, rx) = unbounded(); + txs.push(tx); + + std::thread::spawn(move || { + for msg in rx.iter() { + main_tx.send(msg).unwrap(); + } + }); + } + + drop(main_tx); + + for _ in 0..10 { + for tx in &txs { + for _ in 0..msg_num { + tx.send(Default::default()).unwrap(); + } + } + + for _ in 0..thread_num { + for _ in 0..msg_num { + main_rx.recv().unwrap(); + } + } + } + + drop(txs); + assert!(main_rx.recv().is_err()); +} + +#[test] +fn robin() { + let thread_num = 32; + let msg_num = 10; + + let (mut main_tx, main_rx) = bounded::<()>(1); + + for _ in 0..thread_num { + let (mut tx, rx) = bounded(100); + std::mem::swap(&mut tx, &mut main_tx); + + std::thread::spawn(move || { + for msg in rx.iter() { + tx.send(msg).unwrap(); + } + }); + } + + for _ in 0..10 { + let main_tx = main_tx.clone(); + std::thread::spawn(move || { + for _ in 0..msg_num { + main_tx.send(Default::default()).unwrap(); + } + }); + + for _ in 0..msg_num { + main_rx.recv().unwrap(); + } + } +} + +#[cfg(feature = "select")] +#[test] +fn select_general() { + #[derive(Debug, PartialEq)] + struct Foo(usize); + + let (tx0, rx0) = bounded(1); + let (tx1, rx1) = unbounded(); + + for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() { + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + let _ = t.send(Foo(i)); + }); + } + + let x = Selector::new() + .recv(&rx0, |x| x) + .recv(&rx1, |x| x) + .wait() + .unwrap(); + + if x == Foo(0) { + assert!(rx1.recv().unwrap() == Foo(1)); + } else { + assert!(rx0.recv().unwrap() == Foo(0)); + } + + tx0.send(Foo(42)).unwrap(); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(100)); + assert_eq!(rx0.recv().unwrap(), Foo(42)); + assert_eq!(rx0.recv().unwrap(), Foo(43)); + + }); + + Selector::new() + .send(&tx0, Foo(43), |x| x) + .wait() + .unwrap(); + + t.join().unwrap(); +} + +struct MessageWithoutDebug(u32); + +#[test] +// This is a 'does it build' test, to make sure that the error types can turn +// into a std::error::Error without requiring the payload (which is not used +// there) to impl Debug. +fn std_error_without_debug() { + let (tx, rx) = unbounded::<MessageWithoutDebug>(); + + match tx.send(MessageWithoutDebug(1)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.recv() { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match tx.try_send(MessageWithoutDebug(2)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.try_recv() { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } + + match rx.recv_timeout(Duration::from_secs(10000000)) { + Ok(_) => {} + Err(e) => { + let _std_err: &dyn std::error::Error = &e; + } + } +} + +#[test] +fn weak_close() { + let (tx, rx) = unbounded::<()>(); + let weak = tx.downgrade(); + drop(tx); + assert!(weak.upgrade().is_none()); + assert!(rx.is_disconnected()); + assert!(rx.try_recv().is_err()); +} + +#[test] +fn weak_upgrade() { + let (tx, rx) = unbounded(); + let weak = tx.downgrade(); + let tx2 = weak.upgrade().unwrap(); + drop(tx); + assert!(!rx.is_disconnected()); + tx2.send(()).unwrap(); + assert!(rx.try_recv().is_ok()); +} |