aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/tests/async.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/flume/tests/async.rs')
-rw-r--r--vendor/flume/tests/async.rs276
1 files changed, 0 insertions, 276 deletions
diff --git a/vendor/flume/tests/async.rs b/vendor/flume/tests/async.rs
deleted file mode 100644
index 6c2c7f2..0000000
--- a/vendor/flume/tests/async.rs
+++ /dev/null
@@ -1,276 +0,0 @@
-#[cfg(feature = "async")]
-use {
- flume::*,
- futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future},
- futures::task::{Context, Waker, Poll},
- async_std::prelude::FutureExt,
- std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}},
-};
-
-#[cfg(feature = "async")]
-#[test]
-fn r#async_recv() {
- let (tx, rx) = unbounded();
-
- let t = std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(250));
- tx.send(42u32).unwrap();
- });
-
- async_std::task::block_on(async {
- assert_eq!(rx.recv_async().await.unwrap(), 42);
- });
-
- t.join().unwrap();
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn r#async_send() {
- let (tx, rx) = bounded(1);
-
- let t = std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(250));
- assert_eq!(rx.recv(), Ok(42));
- });
-
- async_std::task::block_on(async {
- tx.send_async(42u32).await.unwrap();
- });
-
- t.join().unwrap();
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn r#async_recv_disconnect() {
- let (tx, rx) = bounded::<i32>(0);
-
- let t = std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(250));
- drop(tx)
- });
-
- async_std::task::block_on(async {
- assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
- });
-
- t.join().unwrap();
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn r#async_send_disconnect() {
- let (tx, rx) = bounded(0);
-
- let t = std::thread::spawn(move || {
- std::thread::sleep(std::time::Duration::from_millis(250));
- drop(rx)
- });
-
- async_std::task::block_on(async {
- assert_eq!(tx.send_async(42u32).await, Err(SendError(42)));
- });
-
- t.join().unwrap();
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn r#async_recv_drop_recv() {
- let (tx, rx) = bounded::<i32>(10);
-
- let recv_fut = rx.recv_async();
-
- async_std::task::block_on(async {
- let res = async_std::future::timeout(std::time::Duration::from_millis(500), rx.recv_async()).await;
- assert!(res.is_err());
- });
-
- let rx2 = rx.clone();
- let t = std::thread::spawn(move || {
- async_std::task::block_on(async {
- rx2.recv_async().await
- })
- });
-
- std::thread::sleep(std::time::Duration::from_millis(500));
-
- tx.send(42).unwrap();
-
- drop(recv_fut);
-
- assert_eq!(t.join().unwrap(), Ok(42))
-}
-
-#[cfg(feature = "async")]
-#[async_std::test]
-async fn r#async_send_1_million_no_drop_or_reorder() {
- #[derive(Debug)]
- enum Message {
- Increment {
- old: u64,
- },
- ReturnCount,
- }
-
- let (tx, rx) = unbounded();
-
- let t = async_std::task::spawn(async move {
- let mut count = 0u64;
-
- while let Ok(Message::Increment { old }) = rx.recv_async().await {
- assert_eq!(old, count);
- count += 1;
- }
-
- count
- });
-
- for next in 0..1_000_000 {
- tx.send(Message::Increment { old: next }).unwrap();
- }
-
- tx.send(Message::ReturnCount).unwrap();
-
- let count = t.await;
- assert_eq!(count, 1_000_000)
-}
-
-#[cfg(feature = "async")]
-#[async_std::test]
-async fn parallel_async_receivers() {
- let (tx, rx) = flume::unbounded();
- let send_fut = async move {
- let n_sends: usize = 100000;
- for _ in 0..n_sends {
- tx.send_async(()).await.unwrap();
- }
- };
-
- async_std::task::spawn(
- send_fut
- .timeout(Duration::from_secs(5))
- .map_err(|_| panic!("Send timed out!"))
- );
-
- let mut futures_unordered = (0..250)
- .map(|_| async {
- while let Ok(()) = rx.recv_async().await
- /* rx.recv() is OK */
- {}
- })
- .collect::<FuturesUnordered<_>>();
-
- let recv_fut = async {
- while futures_unordered.next().await.is_some() {}
- };
-
- recv_fut
- .timeout(Duration::from_secs(5))
- .map_err(|_| panic!("Receive timed out!"))
- .await
- .unwrap();
-
- println!("recv end");
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn change_waker() {
- let (tx, rx) = flume::bounded(1);
- tx.send(()).unwrap();
-
- struct DebugWaker(Arc<AtomicUsize>, Waker);
-
- impl DebugWaker {
- fn new() -> Self {
- let woken = Arc::new(AtomicUsize::new(0));
- let woken_cloned = woken.clone();
- let waker = waker_fn::waker_fn(move || {
- woken.fetch_add(1, Ordering::SeqCst);
- });
- DebugWaker(woken_cloned, waker)
- }
-
- fn woken(&self) -> usize {
- self.0.load(Ordering::SeqCst)
- }
-
- fn ctx(&self) -> Context {
- Context::from_waker(&self.1)
- }
- }
-
- // Check that the waker is correctly updated when sending tasks change their wakers
- {
- let send_fut = tx.send_async(());
- futures::pin_mut!(send_fut);
-
- let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new());
-
- // Set the waker to waker1
- assert_eq!(send_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending);
-
- // Change the waker to waker2
- assert_eq!(send_fut.poll(&mut waker2.ctx()), Poll::Pending);
-
- // Wake the future
- rx.recv().unwrap();
-
- // Check that waker2 was woken and waker1 was not
- assert_eq!(waker1.woken(), 0);
- assert_eq!(waker2.woken(), 1);
- }
-
- // Check that the waker is correctly updated when receiving tasks change their wakers
- {
- rx.recv().unwrap();
- let recv_fut = rx.recv_async();
- futures::pin_mut!(recv_fut);
-
- let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new());
-
- // Set the waker to waker1
- assert_eq!(recv_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending);
-
- // Change the waker to waker2
- assert_eq!(recv_fut.poll(&mut waker2.ctx()), Poll::Pending);
-
- // Wake the future
- tx.send(()).unwrap();
-
- // Check that waker2 was woken and waker1 was not
- assert_eq!(waker1.woken(), 0);
- assert_eq!(waker2.woken(), 1);
- }
-}
-
-#[cfg(feature = "async")]
-#[test]
-fn spsc_single_threaded_value_ordering() {
- async fn test() {
- let (tx, rx) = flume::bounded(4);
- tokio::select! {
- _ = producer(tx) => {},
- _ = consumer(rx) => {},
- }
- }
-
- async fn producer(tx: flume::Sender<usize>) {
- for i in 0..100 {
- tx.send_async(i).await.unwrap();
- }
- }
-
- async fn consumer(rx: flume::Receiver<usize>) {
- let mut expected = 0;
- while let Ok(value) = rx.recv_async().await {
- assert_eq!(value, expected);
- expected += 1;
- }
- }
-
- let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
- rt.block_on(test());
-}