aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/tests/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/flume/tests/stream.rs')
-rw-r--r--vendor/flume/tests/stream.rs255
1 files changed, 255 insertions, 0 deletions
diff --git a/vendor/flume/tests/stream.rs b/vendor/flume/tests/stream.rs
new file mode 100644
index 0000000..e3b32cd
--- /dev/null
+++ b/vendor/flume/tests/stream.rs
@@ -0,0 +1,255 @@
+#[cfg(feature = "async")]
+use {
+ flume::*,
+ futures::{stream::FuturesUnordered, StreamExt, TryFutureExt},
+ async_std::prelude::FutureExt,
+ std::time::Duration,
+};
+use futures::{stream, Stream};
+
+#[cfg(feature = "async")]
+#[test]
+fn stream_recv() {
+ let (tx, rx) = unbounded();
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ tx.send(42u32).unwrap();
+ println!("sent");
+ });
+
+ async_std::task::block_on(async {
+ println!("receiving...");
+ let x = rx.stream().next().await;
+ println!("received");
+ assert_eq!(x, Some(42));
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn stream_recv_disconnect() {
+ let (tx, rx) = bounded::<i32>(0);
+
+ let t = std::thread::spawn(move || {
+ tx.send(42);
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ drop(tx)
+ });
+
+ async_std::task::block_on(async {
+ let mut stream = rx.into_stream();
+ assert_eq!(stream.next().await, Some(42));
+ assert_eq!(stream.next().await, None);
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn stream_recv_drop_recv() {
+ let (tx, rx) = bounded::<i32>(10);
+
+ let rx2 = rx.clone();
+ let mut stream = rx.into_stream();
+
+ async_std::task::block_on(async {
+ let res = async_std::future::timeout(
+ std::time::Duration::from_millis(500),
+ stream.next()
+ ).await;
+
+ assert!(res.is_err());
+ });
+
+ let t = std::thread::spawn(move || {
+ async_std::task::block_on(async {
+ rx2.stream().next().await
+ })
+ });
+
+ std::thread::sleep(std::time::Duration::from_millis(500));
+
+ tx.send(42).unwrap();
+
+ drop(stream);
+
+ assert_eq!(t.join().unwrap(), Some(42))
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn r#stream_drop_send_disconnect() {
+ let (tx, rx) = bounded::<i32>(1);
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ drop(tx);
+ });
+
+ async_std::task::block_on(async {
+ let mut stream = rx.into_stream();
+ assert_eq!(stream.next().await, None);
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[async_std::test]
+async fn stream_send_1_million_no_drop_or_reorder() {
+ #[derive(Debug)]
+ enum Message {
+ Increment {
+ old: u64,
+ },
+ ReturnCount,
+ }
+
+ let (tx, rx) = unbounded();
+
+ let t = async_std::task::spawn(async move {
+ let mut count = 0u64;
+ let mut stream = rx.into_stream();
+
+ while let Some(Message::Increment { old }) = stream.next().await {
+ assert_eq!(old, count);
+ count += 1;
+ }
+
+ count
+ });
+
+ for next in 0..1_000_000 {
+ tx.send(Message::Increment { old: next }).unwrap();
+ }
+
+ tx.send(Message::ReturnCount).unwrap();
+
+ let count = t.await;
+ assert_eq!(count, 1_000_000)
+}
+
+#[cfg(feature = "async")]
+#[async_std::test]
+async fn parallel_streams_and_async_recv() {
+ let (tx, rx) = flume::unbounded();
+ let rx = &rx;
+ let send_fut = async move {
+ let n_sends: usize = 100000;
+ for _ in 0..n_sends {
+ tx.send_async(()).await.unwrap();
+ }
+ };
+
+ async_std::task::spawn(
+ send_fut
+ .timeout(Duration::from_secs(5))
+ .map_err(|_| panic!("Send timed out!"))
+ );
+
+ let mut futures_unordered = (0..250)
+ .map(|n| async move {
+ if n % 2 == 0 {
+ let mut stream = rx.stream();
+ while let Some(()) = stream.next().await {}
+ } else {
+ while let Ok(()) = rx.recv_async().await {}
+ }
+
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let recv_fut = async {
+ while futures_unordered.next().await.is_some() {}
+ };
+
+ recv_fut
+ .timeout(Duration::from_secs(5))
+ .map_err(|_| panic!("Receive timed out!"))
+ .await
+ .unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn stream_no_double_wake() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
+ use std::pin::Pin;
+ use std::task::Context;
+ use futures::task::{waker, ArcWake};
+ use futures::Stream;
+
+ let count = Arc::new(AtomicUsize::new(0));
+
+ // all this waker does is count how many times it is called
+ struct CounterWaker {
+ count: Arc<AtomicUsize>,
+ }
+
+ impl ArcWake for CounterWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.count.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ // create waker and context
+ let w = CounterWaker {
+ count: count.clone(),
+ };
+ let w = waker(Arc::new(w));
+ let cx = &mut Context::from_waker(&w);
+
+ // create unbounded channel
+ let (tx, rx) = unbounded::<()>();
+ let mut stream = rx.stream();
+
+ // register waker with stream
+ let _ = Pin::new(&mut stream).poll_next(cx);
+
+ // send multiple items
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+
+ // verify that stream is only woken up once.
+ assert_eq!(count.load(Ordering::SeqCst), 1);
+}
+
+#[cfg(feature = "async")]
+#[async_std::test]
+async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55
+ fn dummy_stream() -> impl Stream<Item = usize> {
+ stream::unfold(0, |count| async move {
+ if count < 1000 {
+ Some((count, count + 1))
+ } else {
+ None
+ }
+ })
+ }
+
+ let (send_task, recv_task) = {
+ use futures::SinkExt;
+ let (tx, rx) = flume::bounded(100);
+
+ let send_task = dummy_stream()
+ .map(|i| Ok(i))
+ .forward(tx.into_sink().sink_map_err(|e| {
+ panic!("send error:{:#?}", e)
+ }));
+
+ let recv_task = rx
+ .into_stream()
+ .for_each(|item| async move {});
+ (send_task, recv_task)
+ };
+
+ let jh = async_std::task::spawn(send_task);
+ async_std::task::block_on(recv_task);
+ jh.await.unwrap();
+}