aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/tests/basic.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/flume/tests/basic.rs')
-rw-r--r--vendor/flume/tests/basic.rs428
1 files changed, 0 insertions, 428 deletions
diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs
deleted file mode 100644
index b937436..0000000
--- a/vendor/flume/tests/basic.rs
+++ /dev/null
@@ -1,428 +0,0 @@
-use std::time::{Instant, Duration};
-use flume::*;
-
-#[test]
-fn send_recv() {
- let (tx, rx) = unbounded();
- for i in 0..1000 { tx.send(i).unwrap(); }
- for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); }
- assert!(rx.try_recv().is_err());
-}
-
-#[test]
-fn iter() {
- let (tx, rx) = unbounded();
- for i in 0..1000 { tx.send(i).unwrap(); }
- drop(tx);
- assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
-}
-
-#[test]
-fn try_iter() {
- let (tx, rx) = unbounded();
- for i in 0..1000 { tx.send(i).unwrap(); }
- assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum());
-}
-
-#[test]
-fn iter_threaded() {
- let (tx, rx) = unbounded();
- for i in 0..1000 {
- let tx = tx.clone();
- std::thread::spawn(move || tx.send(i).unwrap());
- }
- drop(tx);
- assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
-}
-
-#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
-#[test]
-fn send_timeout() {
- let dur = Duration::from_millis(350);
- let max_error = Duration::from_millis(5);
- let dur_min = dur.checked_sub(max_error).unwrap();
- let dur_max = dur.checked_add(max_error).unwrap();
-
- let (tx, rx) = bounded(1);
-
- assert!(tx.send_timeout(42, dur).is_ok());
-
- let then = Instant::now();
- assert!(tx.send_timeout(43, dur).is_err());
- let now = Instant::now();
-
- let this = now.duration_since(then);
- if !(dur_min < this && this < dur_max) {
- panic!("timeout exceeded: {:?}", this);
- }
-
- assert_eq!(rx.drain().count(), 1);
-
- drop(rx);
-
- assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err());
-}
-
-#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
-#[test]
-fn recv_timeout() {
- let dur = Duration::from_millis(350);
- let max_error = Duration::from_millis(5);
- let dur_min = dur.checked_sub(max_error).unwrap();
- let dur_max = dur.checked_add(max_error).unwrap();
-
- let (tx, rx) = unbounded();
- let then = Instant::now();
- assert!(rx.recv_timeout(dur).is_err());
- let now = Instant::now();
-
- let this = now.duration_since(then);
- if !(dur_min < this && this < dur_max) {
- panic!("timeout exceeded: {:?}", this);
- }
-
- tx.send(42).unwrap();
- assert_eq!(rx.recv_timeout(dur), Ok(42));
- assert!(Instant::now().duration_since(now) < max_error);
-}
-
-#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
-#[test]
-fn recv_deadline() {
- let dur = Duration::from_millis(350);
- let max_error = Duration::from_millis(5);
- let dur_min = dur.checked_sub(max_error).unwrap();
- let dur_max = dur.checked_add(max_error).unwrap();
-
- let (tx, rx) = unbounded();
- let then = Instant::now();
- assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err());
- let now = Instant::now();
-
- let this = now.duration_since(then);
- if !(dur_min < this && this < dur_max) {
- panic!("timeout exceeded: {:?}", this);
- }
-
- tx.send(42).unwrap();
- assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42));
- assert!(Instant::now().duration_since(now) < max_error);
-}
-
-#[test]
-fn recv_timeout_missed_send() {
- let (tx, rx) = bounded(10);
-
- assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
-
- tx.send(42).unwrap();
-
- assert_eq!(rx.recv(), Ok(42));
-}
-
-#[test]
-fn disconnect_tx() {
- let (tx, rx) = unbounded::<()>();
- drop(tx);
- assert!(rx.recv().is_err());
-}
-
-#[test]
-fn disconnect_rx() {
- let (tx, rx) = unbounded();
- drop(rx);
- assert!(tx.send(0).is_err());
-}
-
-#[test]
-fn drain() {
- let (tx, rx) = unbounded();
-
- for i in 0..100 {
- tx.send(i).unwrap();
- }
-
- assert_eq!(rx.drain().sum::<u32>(), (0..100).sum());
-
- for i in 0..100 {
- tx.send(i).unwrap();
- }
-
- for i in 0..100 {
- tx.send(i).unwrap();
- }
-
- rx.recv().unwrap();
-
- (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r));
-}
-
-#[test]
-fn try_send() {
- let (tx, rx) = bounded(5);
-
- for i in 0..5 {
- tx.try_send(i).unwrap();
- }
-
- assert!(tx.try_send(42).is_err());
-
- assert_eq!(rx.recv(), Ok(0));
-
- assert_eq!(tx.try_send(42), Ok(()));
-
- assert_eq!(rx.recv(), Ok(1));
- drop(rx);
-
- assert!(tx.try_send(42).is_err());
-}
-
-#[test]
-fn send_bounded() {
- let (tx, rx) = bounded(5);
-
- for _ in 0..5 {
- tx.send(42).unwrap();
- }
-
- let _ = rx.recv().unwrap();
-
- tx.send(42).unwrap();
-
- assert!(tx.try_send(42).is_err());
-
- rx.drain();
-
- let mut ts = Vec::new();
- for _ in 0..100 {
- let tx = tx.clone();
- ts.push(std::thread::spawn(move || {
- for i in 0..10000 {
- tx.send(i).unwrap();
- }
- }));
- }
-
- drop(tx);
-
- assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100);
-
- for t in ts {
- t.join().unwrap();
- }
-
- assert!(rx.recv().is_err());
-}
-
-#[test]
-fn rendezvous() {
- let (tx, rx) = bounded(0);
-
- for i in 0..5 {
- let tx = tx.clone();
- let t = std::thread::spawn(move || {
- assert!(tx.try_send(()).is_err());
-
- let then = Instant::now();
- tx.send(()).unwrap();
- let now = Instant::now();
-
- assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i);
- });
-
- std::thread::sleep(Duration::from_millis(1000));
- rx.recv().unwrap();
-
- t.join().unwrap();
- }
-}
-
-#[test]
-fn hydra() {
- let thread_num = 32;
- let msg_num = 1000;
-
- let (main_tx, main_rx) = unbounded::<()>();
-
- let mut txs = Vec::new();
- for _ in 0..thread_num {
- let main_tx = main_tx.clone();
- let (tx, rx) = unbounded();
- txs.push(tx);
-
- std::thread::spawn(move || {
- for msg in rx.iter() {
- main_tx.send(msg).unwrap();
- }
- });
- }
-
- drop(main_tx);
-
- for _ in 0..10 {
- for tx in &txs {
- for _ in 0..msg_num {
- tx.send(Default::default()).unwrap();
- }
- }
-
- for _ in 0..thread_num {
- for _ in 0..msg_num {
- main_rx.recv().unwrap();
- }
- }
- }
-
- drop(txs);
- assert!(main_rx.recv().is_err());
-}
-
-#[test]
-fn robin() {
- let thread_num = 32;
- let msg_num = 10;
-
- let (mut main_tx, main_rx) = bounded::<()>(1);
-
- for _ in 0..thread_num {
- let (mut tx, rx) = bounded(100);
- std::mem::swap(&mut tx, &mut main_tx);
-
- std::thread::spawn(move || {
- for msg in rx.iter() {
- tx.send(msg).unwrap();
- }
- });
- }
-
- for _ in 0..10 {
- let main_tx = main_tx.clone();
- std::thread::spawn(move || {
- for _ in 0..msg_num {
- main_tx.send(Default::default()).unwrap();
- }
- });
-
- for _ in 0..msg_num {
- main_rx.recv().unwrap();
- }
- }
-}
-
-#[cfg(feature = "select")]
-#[test]
-fn select_general() {
- #[derive(Debug, PartialEq)]
- struct Foo(usize);
-
- let (tx0, rx0) = bounded(1);
- let (tx1, rx1) = unbounded();
-
- for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() {
- std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(250));
- let _ = t.send(Foo(i));
- });
- }
-
- let x = Selector::new()
- .recv(&rx0, |x| x)
- .recv(&rx1, |x| x)
- .wait()
- .unwrap();
-
- if x == Foo(0) {
- assert!(rx1.recv().unwrap() == Foo(1));
- } else {
- assert!(rx0.recv().unwrap() == Foo(0));
- }
-
- tx0.send(Foo(42)).unwrap();
-
- let t = std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(100));
- assert_eq!(rx0.recv().unwrap(), Foo(42));
- assert_eq!(rx0.recv().unwrap(), Foo(43));
-
- });
-
- Selector::new()
- .send(&tx0, Foo(43), |x| x)
- .wait()
- .unwrap();
-
- t.join().unwrap();
-}
-
-struct MessageWithoutDebug(u32);
-
-#[test]
-// This is a 'does it build' test, to make sure that the error types can turn
-// into a std::error::Error without requiring the payload (which is not used
-// there) to impl Debug.
-fn std_error_without_debug() {
- let (tx, rx) = unbounded::<MessageWithoutDebug>();
-
- match tx.send(MessageWithoutDebug(1)) {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-
- match rx.recv() {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-
- match tx.try_send(MessageWithoutDebug(2)) {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-
- match rx.try_recv() {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-
- match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-
- match rx.recv_timeout(Duration::from_secs(10000000)) {
- Ok(_) => {}
- Err(e) => {
- let _std_err: &dyn std::error::Error = &e;
- }
- }
-}
-
-#[test]
-fn weak_close() {
- let (tx, rx) = unbounded::<()>();
- let weak = tx.downgrade();
- drop(tx);
- assert!(weak.upgrade().is_none());
- assert!(rx.is_disconnected());
- assert!(rx.try_recv().is_err());
-}
-
-#[test]
-fn weak_upgrade() {
- let (tx, rx) = unbounded();
- let weak = tx.downgrade();
- let tx2 = weak.upgrade().unwrap();
- drop(tx);
- assert!(!rx.is_disconnected());
- tx2.send(()).unwrap();
- assert!(rx.try_recv().is_ok());
-}