diff options
author | Valentin Popov <valentin@popov.link> | 2024-07-19 15:37:58 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-07-19 15:37:58 +0300 |
commit | a990de90fe41456a23e58bd087d2f107d321f3a1 (patch) | |
tree | 15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/flume/tests/stream.rs | |
parent | 3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff) | |
download | fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip |
Deleted vendor folder
Diffstat (limited to 'vendor/flume/tests/stream.rs')
-rw-r--r-- | vendor/flume/tests/stream.rs | 255 |
1 files changed, 0 insertions, 255 deletions
diff --git a/vendor/flume/tests/stream.rs b/vendor/flume/tests/stream.rs deleted file mode 100644 index e3b32cd..0000000 --- a/vendor/flume/tests/stream.rs +++ /dev/null @@ -1,255 +0,0 @@ -#[cfg(feature = "async")] -use { - flume::*, - futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}, - async_std::prelude::FutureExt, - std::time::Duration, -}; -use futures::{stream, Stream}; - -#[cfg(feature = "async")] -#[test] -fn stream_recv() { - let (tx, rx) = unbounded(); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - tx.send(42u32).unwrap(); - println!("sent"); - }); - - async_std::task::block_on(async { - println!("receiving..."); - let x = rx.stream().next().await; - println!("received"); - assert_eq!(x, Some(42)); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_recv_disconnect() { - let (tx, rx) = bounded::<i32>(0); - - let t = std::thread::spawn(move || { - tx.send(42); - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(tx) - }); - - async_std::task::block_on(async { - let mut stream = rx.into_stream(); - assert_eq!(stream.next().await, Some(42)); - assert_eq!(stream.next().await, None); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_recv_drop_recv() { - let (tx, rx) = bounded::<i32>(10); - - let rx2 = rx.clone(); - let mut stream = rx.into_stream(); - - async_std::task::block_on(async { - let res = async_std::future::timeout( - std::time::Duration::from_millis(500), - stream.next() - ).await; - - assert!(res.is_err()); - }); - - let t = std::thread::spawn(move || { - async_std::task::block_on(async { - rx2.stream().next().await - }) - }); - - std::thread::sleep(std::time::Duration::from_millis(500)); - - tx.send(42).unwrap(); - - drop(stream); - - assert_eq!(t.join().unwrap(), Some(42)) -} - -#[cfg(feature = "async")] -#[test] -fn r#stream_drop_send_disconnect() { - let (tx, rx) = bounded::<i32>(1); - - let t = std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(250)); - drop(tx); - }); - - async_std::task::block_on(async { - let mut stream = rx.into_stream(); - assert_eq!(stream.next().await, None); - }); - - t.join().unwrap(); -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn stream_send_1_million_no_drop_or_reorder() { - #[derive(Debug)] - enum Message { - Increment { - old: u64, - }, - ReturnCount, - } - - let (tx, rx) = unbounded(); - - let t = async_std::task::spawn(async move { - let mut count = 0u64; - let mut stream = rx.into_stream(); - - while let Some(Message::Increment { old }) = stream.next().await { - assert_eq!(old, count); - count += 1; - } - - count - }); - - for next in 0..1_000_000 { - tx.send(Message::Increment { old: next }).unwrap(); - } - - tx.send(Message::ReturnCount).unwrap(); - - let count = t.await; - assert_eq!(count, 1_000_000) -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn parallel_streams_and_async_recv() { - let (tx, rx) = flume::unbounded(); - let rx = ℞ - let send_fut = async move { - let n_sends: usize = 100000; - for _ in 0..n_sends { - tx.send_async(()).await.unwrap(); - } - }; - - async_std::task::spawn( - send_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Send timed out!")) - ); - - let mut futures_unordered = (0..250) - .map(|n| async move { - if n % 2 == 0 { - let mut stream = rx.stream(); - while let Some(()) = stream.next().await {} - } else { - while let Ok(()) = rx.recv_async().await {} - } - - }) - .collect::<FuturesUnordered<_>>(); - - let recv_fut = async { - while futures_unordered.next().await.is_some() {} - }; - - recv_fut - .timeout(Duration::from_secs(5)) - .map_err(|_| panic!("Receive timed out!")) - .await - .unwrap(); -} - -#[cfg(feature = "async")] -#[test] -fn stream_no_double_wake() { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - use std::pin::Pin; - use std::task::Context; - use futures::task::{waker, ArcWake}; - use futures::Stream; - - let count = Arc::new(AtomicUsize::new(0)); - - // all this waker does is count how many times it is called - struct CounterWaker { - count: Arc<AtomicUsize>, - } - - impl ArcWake for CounterWaker { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.count.fetch_add(1, Ordering::SeqCst); - } - } - - // create waker and context - let w = CounterWaker { - count: count.clone(), - }; - let w = waker(Arc::new(w)); - let cx = &mut Context::from_waker(&w); - - // create unbounded channel - let (tx, rx) = unbounded::<()>(); - let mut stream = rx.stream(); - - // register waker with stream - let _ = Pin::new(&mut stream).poll_next(cx); - - // send multiple items - tx.send(()).unwrap(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - - // verify that stream is only woken up once. - assert_eq!(count.load(Ordering::SeqCst), 1); -} - -#[cfg(feature = "async")] -#[async_std::test] -async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55 - fn dummy_stream() -> impl Stream<Item = usize> { - stream::unfold(0, |count| async move { - if count < 1000 { - Some((count, count + 1)) - } else { - None - } - }) - } - - let (send_task, recv_task) = { - use futures::SinkExt; - let (tx, rx) = flume::bounded(100); - - let send_task = dummy_stream() - .map(|i| Ok(i)) - .forward(tx.into_sink().sink_map_err(|e| { - panic!("send error:{:#?}", e) - })); - - let recv_task = rx - .into_stream() - .for_each(|item| async move {}); - (send_task, recv_task) - }; - - let jh = async_std::task::spawn(send_task); - async_std::task::block_on(recv_task); - jh.await.unwrap(); -} |