From 1b6a04ca5504955c571d1c97504fb45ea0befee4 Mon Sep 17 00:00:00 2001
From: Valentin Popov <valentin@popov.link>
Date: Mon, 8 Jan 2024 01:21:28 +0400
Subject: Initial vendor packages

Signed-off-by: Valentin Popov <valentin@popov.link>
---
 vendor/flume/tests/basic.rs | 428 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 428 insertions(+)
 create mode 100644 vendor/flume/tests/basic.rs

(limited to 'vendor/flume/tests/basic.rs')

diff --git a/vendor/flume/tests/basic.rs b/vendor/flume/tests/basic.rs
new file mode 100644
index 0000000..b937436
--- /dev/null
+++ b/vendor/flume/tests/basic.rs
@@ -0,0 +1,428 @@
+use std::time::{Instant, Duration};
+use flume::*;
+
+#[test]
+fn send_recv() {
+    let (tx, rx) = unbounded();
+    for i in 0..1000 { tx.send(i).unwrap(); }
+    for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); }
+    assert!(rx.try_recv().is_err());
+}
+
+#[test]
+fn iter() {
+    let (tx, rx) = unbounded();
+    for i in 0..1000 { tx.send(i).unwrap(); }
+    drop(tx);
+    assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
+}
+
+#[test]
+fn try_iter() {
+    let (tx, rx) = unbounded();
+    for i in 0..1000 { tx.send(i).unwrap(); }
+    assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum());
+}
+
+#[test]
+fn iter_threaded() {
+    let (tx, rx) = unbounded();
+    for i in 0..1000 {
+        let tx = tx.clone();
+        std::thread::spawn(move || tx.send(i).unwrap());
+    }
+    drop(tx);
+    assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
+}
+
+#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
+#[test]
+fn send_timeout() {
+    let dur = Duration::from_millis(350);
+    let max_error = Duration::from_millis(5);
+    let dur_min = dur.checked_sub(max_error).unwrap();
+    let dur_max = dur.checked_add(max_error).unwrap();
+
+    let (tx, rx) = bounded(1);
+
+    assert!(tx.send_timeout(42, dur).is_ok());
+
+    let then = Instant::now();
+    assert!(tx.send_timeout(43, dur).is_err());
+    let now = Instant::now();
+
+    let this = now.duration_since(then);
+    if !(dur_min < this && this < dur_max) {
+        panic!("timeout exceeded: {:?}", this);
+    }
+
+    assert_eq!(rx.drain().count(), 1);
+
+    drop(rx);
+
+    assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err());
+}
+
+#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
+#[test]
+fn recv_timeout() {
+    let dur = Duration::from_millis(350);
+    let max_error = Duration::from_millis(5);
+    let dur_min = dur.checked_sub(max_error).unwrap();
+    let dur_max = dur.checked_add(max_error).unwrap();
+
+    let (tx, rx) = unbounded();
+    let then = Instant::now();
+    assert!(rx.recv_timeout(dur).is_err());
+    let now = Instant::now();
+
+    let this = now.duration_since(then);
+    if !(dur_min < this && this < dur_max) {
+        panic!("timeout exceeded: {:?}", this);
+    }
+
+    tx.send(42).unwrap();
+    assert_eq!(rx.recv_timeout(dur), Ok(42));
+    assert!(Instant::now().duration_since(now) < max_error);
+}
+
+#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
+#[test]
+fn recv_deadline() {
+    let dur = Duration::from_millis(350);
+    let max_error = Duration::from_millis(5);
+    let dur_min = dur.checked_sub(max_error).unwrap();
+    let dur_max = dur.checked_add(max_error).unwrap();
+
+    let (tx, rx) = unbounded();
+    let then = Instant::now();
+    assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err());
+    let now = Instant::now();
+
+    let this = now.duration_since(then);
+    if !(dur_min < this && this < dur_max) {
+        panic!("timeout exceeded: {:?}", this);
+    }
+
+    tx.send(42).unwrap();
+    assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42));
+    assert!(Instant::now().duration_since(now) < max_error);
+}
+
+#[test]
+fn recv_timeout_missed_send() {
+    let (tx, rx) = bounded(10);
+
+    assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
+
+    tx.send(42).unwrap();
+
+    assert_eq!(rx.recv(), Ok(42));
+}
+
+#[test]
+fn disconnect_tx() {
+    let (tx, rx) = unbounded::<()>();
+    drop(tx);
+    assert!(rx.recv().is_err());
+}
+
+#[test]
+fn disconnect_rx() {
+    let (tx, rx) = unbounded();
+    drop(rx);
+    assert!(tx.send(0).is_err());
+}
+
+#[test]
+fn drain() {
+    let (tx, rx) = unbounded();
+
+    for i in 0..100 {
+        tx.send(i).unwrap();
+    }
+
+    assert_eq!(rx.drain().sum::<u32>(), (0..100).sum());
+
+    for i in 0..100 {
+        tx.send(i).unwrap();
+    }
+
+    for i in 0..100 {
+        tx.send(i).unwrap();
+    }
+
+    rx.recv().unwrap();
+
+    (1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r));
+}
+
+#[test]
+fn try_send() {
+    let (tx, rx) = bounded(5);
+
+    for i in 0..5 {
+        tx.try_send(i).unwrap();
+    }
+
+    assert!(tx.try_send(42).is_err());
+
+    assert_eq!(rx.recv(), Ok(0));
+
+    assert_eq!(tx.try_send(42), Ok(()));
+
+    assert_eq!(rx.recv(), Ok(1));
+    drop(rx);
+
+    assert!(tx.try_send(42).is_err());
+}
+
+#[test]
+fn send_bounded() {
+    let (tx, rx) = bounded(5);
+
+    for _ in 0..5 {
+        tx.send(42).unwrap();
+    }
+
+    let _ = rx.recv().unwrap();
+
+    tx.send(42).unwrap();
+
+    assert!(tx.try_send(42).is_err());
+
+    rx.drain();
+
+    let mut ts = Vec::new();
+    for _ in 0..100 {
+        let tx = tx.clone();
+        ts.push(std::thread::spawn(move || {
+            for i in 0..10000 {
+                tx.send(i).unwrap();
+            }
+        }));
+    }
+
+    drop(tx);
+
+    assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100);
+
+    for t in ts {
+        t.join().unwrap();
+    }
+
+    assert!(rx.recv().is_err());
+}
+
+#[test]
+fn rendezvous() {
+    let (tx, rx) = bounded(0);
+
+    for i in 0..5 {
+        let tx = tx.clone();
+        let t = std::thread::spawn(move || {
+            assert!(tx.try_send(()).is_err());
+
+            let then = Instant::now();
+            tx.send(()).unwrap();
+            let now = Instant::now();
+
+            assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i);
+        });
+
+        std::thread::sleep(Duration::from_millis(1000));
+        rx.recv().unwrap();
+
+        t.join().unwrap();
+    }
+}
+
+#[test]
+fn hydra() {
+    let thread_num = 32;
+    let msg_num = 1000;
+
+    let (main_tx, main_rx) = unbounded::<()>();
+
+    let mut txs = Vec::new();
+    for _ in 0..thread_num {
+        let main_tx = main_tx.clone();
+        let (tx, rx) = unbounded();
+        txs.push(tx);
+
+        std::thread::spawn(move || {
+            for msg in rx.iter() {
+                main_tx.send(msg).unwrap();
+            }
+        });
+    }
+
+    drop(main_tx);
+
+    for _ in 0..10 {
+        for tx in &txs {
+            for _ in 0..msg_num {
+                tx.send(Default::default()).unwrap();
+            }
+        }
+
+        for _ in 0..thread_num {
+            for _ in 0..msg_num {
+                main_rx.recv().unwrap();
+            }
+        }
+    }
+
+    drop(txs);
+    assert!(main_rx.recv().is_err());
+}
+
+#[test]
+fn robin() {
+    let thread_num = 32;
+    let msg_num = 10;
+
+    let (mut main_tx, main_rx) = bounded::<()>(1);
+
+    for _ in 0..thread_num {
+        let (mut tx, rx) = bounded(100);
+        std::mem::swap(&mut tx, &mut main_tx);
+
+        std::thread::spawn(move || {
+            for msg in rx.iter() {
+                tx.send(msg).unwrap();
+            }
+        });
+    }
+
+    for _ in 0..10 {
+        let main_tx = main_tx.clone();
+        std::thread::spawn(move || {
+            for _ in 0..msg_num {
+                main_tx.send(Default::default()).unwrap();
+            }
+        });
+
+        for _ in 0..msg_num {
+            main_rx.recv().unwrap();
+        }
+    }
+}
+
+#[cfg(feature = "select")]
+#[test]
+fn select_general() {
+    #[derive(Debug, PartialEq)]
+    struct Foo(usize);
+
+    let (tx0, rx0) = bounded(1);
+    let (tx1, rx1) = unbounded();
+
+    for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() {
+        std::thread::spawn(move || {
+            std::thread::sleep(std::time::Duration::from_millis(250));
+            let _ = t.send(Foo(i));
+        });
+    }
+
+    let x = Selector::new()
+        .recv(&rx0, |x| x)
+        .recv(&rx1, |x| x)
+        .wait()
+        .unwrap();
+
+    if x == Foo(0) {
+        assert!(rx1.recv().unwrap() == Foo(1));
+    } else {
+        assert!(rx0.recv().unwrap() == Foo(0));
+    }
+
+    tx0.send(Foo(42)).unwrap();
+
+    let t = std::thread::spawn(move || {
+        std::thread::sleep(std::time::Duration::from_millis(100));
+        assert_eq!(rx0.recv().unwrap(), Foo(42));
+        assert_eq!(rx0.recv().unwrap(), Foo(43));
+
+    });
+
+    Selector::new()
+        .send(&tx0, Foo(43), |x| x)
+        .wait()
+        .unwrap();
+
+    t.join().unwrap();
+}
+
+struct MessageWithoutDebug(u32);
+
+#[test]
+// This is a 'does it build' test, to make sure that the error types can turn
+// into a std::error::Error without requiring the payload (which is not used
+// there) to impl Debug.
+fn std_error_without_debug() {
+    let (tx, rx) = unbounded::<MessageWithoutDebug>();
+
+    match tx.send(MessageWithoutDebug(1)) {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+
+    match rx.recv() {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+
+    match tx.try_send(MessageWithoutDebug(2)) {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+
+    match rx.try_recv() {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+
+    match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+
+    match rx.recv_timeout(Duration::from_secs(10000000)) {
+        Ok(_) => {}
+        Err(e) => {
+            let _std_err: &dyn std::error::Error = &e;
+        }
+    }
+}
+
+#[test]
+fn weak_close() {
+    let (tx, rx) = unbounded::<()>();
+    let weak = tx.downgrade();
+    drop(tx);
+    assert!(weak.upgrade().is_none());
+    assert!(rx.is_disconnected());
+    assert!(rx.try_recv().is_err());
+}
+
+#[test]
+fn weak_upgrade() {
+    let (tx, rx) = unbounded();
+    let weak = tx.downgrade();
+    let tx2 = weak.upgrade().unwrap();
+    drop(tx);
+    assert!(!rx.is_disconnected());
+    tx2.send(()).unwrap();
+    assert!(rx.try_recv().is_ok());
+}
-- 
cgit v1.2.3