aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/tests/async.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/flume/tests/async.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/flume/tests/async.rs')
-rw-r--r--vendor/flume/tests/async.rs276
1 files changed, 276 insertions, 0 deletions
diff --git a/vendor/flume/tests/async.rs b/vendor/flume/tests/async.rs
new file mode 100644
index 0000000..6c2c7f2
--- /dev/null
+++ b/vendor/flume/tests/async.rs
@@ -0,0 +1,276 @@
+#[cfg(feature = "async")]
+use {
+ flume::*,
+ futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future},
+ futures::task::{Context, Waker, Poll},
+ async_std::prelude::FutureExt,
+ std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}},
+};
+
+#[cfg(feature = "async")]
+#[test]
+fn r#async_recv() {
+ let (tx, rx) = unbounded();
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ tx.send(42u32).unwrap();
+ });
+
+ async_std::task::block_on(async {
+ assert_eq!(rx.recv_async().await.unwrap(), 42);
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn r#async_send() {
+ let (tx, rx) = bounded(1);
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ assert_eq!(rx.recv(), Ok(42));
+ });
+
+ async_std::task::block_on(async {
+ tx.send_async(42u32).await.unwrap();
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn r#async_recv_disconnect() {
+ let (tx, rx) = bounded::<i32>(0);
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ drop(tx)
+ });
+
+ async_std::task::block_on(async {
+ assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn r#async_send_disconnect() {
+ let (tx, rx) = bounded(0);
+
+ let t = std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(250));
+ drop(rx)
+ });
+
+ async_std::task::block_on(async {
+ assert_eq!(tx.send_async(42u32).await, Err(SendError(42)));
+ });
+
+ t.join().unwrap();
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn r#async_recv_drop_recv() {
+ let (tx, rx) = bounded::<i32>(10);
+
+ let recv_fut = rx.recv_async();
+
+ async_std::task::block_on(async {
+ let res = async_std::future::timeout(std::time::Duration::from_millis(500), rx.recv_async()).await;
+ assert!(res.is_err());
+ });
+
+ let rx2 = rx.clone();
+ let t = std::thread::spawn(move || {
+ async_std::task::block_on(async {
+ rx2.recv_async().await
+ })
+ });
+
+ std::thread::sleep(std::time::Duration::from_millis(500));
+
+ tx.send(42).unwrap();
+
+ drop(recv_fut);
+
+ assert_eq!(t.join().unwrap(), Ok(42))
+}
+
+#[cfg(feature = "async")]
+#[async_std::test]
+async fn r#async_send_1_million_no_drop_or_reorder() {
+ #[derive(Debug)]
+ enum Message {
+ Increment {
+ old: u64,
+ },
+ ReturnCount,
+ }
+
+ let (tx, rx) = unbounded();
+
+ let t = async_std::task::spawn(async move {
+ let mut count = 0u64;
+
+ while let Ok(Message::Increment { old }) = rx.recv_async().await {
+ assert_eq!(old, count);
+ count += 1;
+ }
+
+ count
+ });
+
+ for next in 0..1_000_000 {
+ tx.send(Message::Increment { old: next }).unwrap();
+ }
+
+ tx.send(Message::ReturnCount).unwrap();
+
+ let count = t.await;
+ assert_eq!(count, 1_000_000)
+}
+
+#[cfg(feature = "async")]
+#[async_std::test]
+async fn parallel_async_receivers() {
+ let (tx, rx) = flume::unbounded();
+ let send_fut = async move {
+ let n_sends: usize = 100000;
+ for _ in 0..n_sends {
+ tx.send_async(()).await.unwrap();
+ }
+ };
+
+ async_std::task::spawn(
+ send_fut
+ .timeout(Duration::from_secs(5))
+ .map_err(|_| panic!("Send timed out!"))
+ );
+
+ let mut futures_unordered = (0..250)
+ .map(|_| async {
+ while let Ok(()) = rx.recv_async().await
+ /* rx.recv() is OK */
+ {}
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let recv_fut = async {
+ while futures_unordered.next().await.is_some() {}
+ };
+
+ recv_fut
+ .timeout(Duration::from_secs(5))
+ .map_err(|_| panic!("Receive timed out!"))
+ .await
+ .unwrap();
+
+ println!("recv end");
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn change_waker() {
+ let (tx, rx) = flume::bounded(1);
+ tx.send(()).unwrap();
+
+ struct DebugWaker(Arc<AtomicUsize>, Waker);
+
+ impl DebugWaker {
+ fn new() -> Self {
+ let woken = Arc::new(AtomicUsize::new(0));
+ let woken_cloned = woken.clone();
+ let waker = waker_fn::waker_fn(move || {
+ woken.fetch_add(1, Ordering::SeqCst);
+ });
+ DebugWaker(woken_cloned, waker)
+ }
+
+ fn woken(&self) -> usize {
+ self.0.load(Ordering::SeqCst)
+ }
+
+ fn ctx(&self) -> Context {
+ Context::from_waker(&self.1)
+ }
+ }
+
+ // Check that the waker is correctly updated when sending tasks change their wakers
+ {
+ let send_fut = tx.send_async(());
+ futures::pin_mut!(send_fut);
+
+ let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new());
+
+ // Set the waker to waker1
+ assert_eq!(send_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending);
+
+ // Change the waker to waker2
+ assert_eq!(send_fut.poll(&mut waker2.ctx()), Poll::Pending);
+
+ // Wake the future
+ rx.recv().unwrap();
+
+ // Check that waker2 was woken and waker1 was not
+ assert_eq!(waker1.woken(), 0);
+ assert_eq!(waker2.woken(), 1);
+ }
+
+ // Check that the waker is correctly updated when receiving tasks change their wakers
+ {
+ rx.recv().unwrap();
+ let recv_fut = rx.recv_async();
+ futures::pin_mut!(recv_fut);
+
+ let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new());
+
+ // Set the waker to waker1
+ assert_eq!(recv_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending);
+
+ // Change the waker to waker2
+ assert_eq!(recv_fut.poll(&mut waker2.ctx()), Poll::Pending);
+
+ // Wake the future
+ tx.send(()).unwrap();
+
+ // Check that waker2 was woken and waker1 was not
+ assert_eq!(waker1.woken(), 0);
+ assert_eq!(waker2.woken(), 1);
+ }
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn spsc_single_threaded_value_ordering() {
+ async fn test() {
+ let (tx, rx) = flume::bounded(4);
+ tokio::select! {
+ _ = producer(tx) => {},
+ _ = consumer(rx) => {},
+ }
+ }
+
+ async fn producer(tx: flume::Sender<usize>) {
+ for i in 0..100 {
+ tx.send_async(i).await.unwrap();
+ }
+ }
+
+ async fn consumer(rx: flume::Receiver<usize>) {
+ let mut expected = 0;
+ while let Ok(value) = rx.recv_async().await {
+ assert_eq!(value, expected);
+ expected += 1;
+ }
+ }
+
+ let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
+ rt.block_on(test());
+}