diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/flume/tests/async.rs | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/flume/tests/async.rs')
-rw-r--r-- | vendor/flume/tests/async.rs | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/vendor/flume/tests/async.rs b/vendor/flume/tests/async.rs new file mode 100644 index 0000000..6c2c7f2 --- /dev/null +++ b/vendor/flume/tests/async.rs @@ -0,0 +1,276 @@ +#[cfg(feature = "async")] +use { + flume::*, + futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future}, + futures::task::{Context, Waker, Poll}, + async_std::prelude::FutureExt, + std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}}, +}; + +#[cfg(feature = "async")] +#[test] +fn r#async_recv() { + let (tx, rx) = unbounded(); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + tx.send(42u32).unwrap(); + }); + + async_std::task::block_on(async { + assert_eq!(rx.recv_async().await.unwrap(), 42); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_send() { + let (tx, rx) = bounded(1); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + assert_eq!(rx.recv(), Ok(42)); + }); + + async_std::task::block_on(async { + tx.send_async(42u32).await.unwrap(); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_recv_disconnect() { + let (tx, rx) = bounded::<i32>(0); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(tx) + }); + + async_std::task::block_on(async { + assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected)); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_send_disconnect() { + let (tx, rx) = bounded(0); + + let t = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(250)); + drop(rx) + }); + + async_std::task::block_on(async { + assert_eq!(tx.send_async(42u32).await, Err(SendError(42))); + }); + + t.join().unwrap(); +} + +#[cfg(feature = "async")] +#[test] +fn r#async_recv_drop_recv() { + let (tx, rx) = bounded::<i32>(10); + + let recv_fut = rx.recv_async(); + + async_std::task::block_on(async { + let res = async_std::future::timeout(std::time::Duration::from_millis(500), rx.recv_async()).await; + assert!(res.is_err()); + }); + + let rx2 = rx.clone(); + let t = std::thread::spawn(move || { + async_std::task::block_on(async { + rx2.recv_async().await + }) + }); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + tx.send(42).unwrap(); + + drop(recv_fut); + + assert_eq!(t.join().unwrap(), Ok(42)) +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn r#async_send_1_million_no_drop_or_reorder() { + #[derive(Debug)] + enum Message { + Increment { + old: u64, + }, + ReturnCount, + } + + let (tx, rx) = unbounded(); + + let t = async_std::task::spawn(async move { + let mut count = 0u64; + + while let Ok(Message::Increment { old }) = rx.recv_async().await { + assert_eq!(old, count); + count += 1; + } + + count + }); + + for next in 0..1_000_000 { + tx.send(Message::Increment { old: next }).unwrap(); + } + + tx.send(Message::ReturnCount).unwrap(); + + let count = t.await; + assert_eq!(count, 1_000_000) +} + +#[cfg(feature = "async")] +#[async_std::test] +async fn parallel_async_receivers() { + let (tx, rx) = flume::unbounded(); + let send_fut = async move { + let n_sends: usize = 100000; + for _ in 0..n_sends { + tx.send_async(()).await.unwrap(); + } + }; + + async_std::task::spawn( + send_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Send timed out!")) + ); + + let mut futures_unordered = (0..250) + .map(|_| async { + while let Ok(()) = rx.recv_async().await + /* rx.recv() is OK */ + {} + }) + .collect::<FuturesUnordered<_>>(); + + let recv_fut = async { + while futures_unordered.next().await.is_some() {} + }; + + recv_fut + .timeout(Duration::from_secs(5)) + .map_err(|_| panic!("Receive timed out!")) + .await + .unwrap(); + + println!("recv end"); +} + +#[cfg(feature = "async")] +#[test] +fn change_waker() { + let (tx, rx) = flume::bounded(1); + tx.send(()).unwrap(); + + struct DebugWaker(Arc<AtomicUsize>, Waker); + + impl DebugWaker { + fn new() -> Self { + let woken = Arc::new(AtomicUsize::new(0)); + let woken_cloned = woken.clone(); + let waker = waker_fn::waker_fn(move || { + woken.fetch_add(1, Ordering::SeqCst); + }); + DebugWaker(woken_cloned, waker) + } + + fn woken(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + fn ctx(&self) -> Context { + Context::from_waker(&self.1) + } + } + + // Check that the waker is correctly updated when sending tasks change their wakers + { + let send_fut = tx.send_async(()); + futures::pin_mut!(send_fut); + + let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); + + // Set the waker to waker1 + assert_eq!(send_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); + + // Change the waker to waker2 + assert_eq!(send_fut.poll(&mut waker2.ctx()), Poll::Pending); + + // Wake the future + rx.recv().unwrap(); + + // Check that waker2 was woken and waker1 was not + assert_eq!(waker1.woken(), 0); + assert_eq!(waker2.woken(), 1); + } + + // Check that the waker is correctly updated when receiving tasks change their wakers + { + rx.recv().unwrap(); + let recv_fut = rx.recv_async(); + futures::pin_mut!(recv_fut); + + let (waker1, waker2) = (DebugWaker::new(), DebugWaker::new()); + + // Set the waker to waker1 + assert_eq!(recv_fut.as_mut().poll(&mut waker1.ctx()), Poll::Pending); + + // Change the waker to waker2 + assert_eq!(recv_fut.poll(&mut waker2.ctx()), Poll::Pending); + + // Wake the future + tx.send(()).unwrap(); + + // Check that waker2 was woken and waker1 was not + assert_eq!(waker1.woken(), 0); + assert_eq!(waker2.woken(), 1); + } +} + +#[cfg(feature = "async")] +#[test] +fn spsc_single_threaded_value_ordering() { + async fn test() { + let (tx, rx) = flume::bounded(4); + tokio::select! { + _ = producer(tx) => {}, + _ = consumer(rx) => {}, + } + } + + async fn producer(tx: flume::Sender<usize>) { + for i in 0..100 { + tx.send_async(i).await.unwrap(); + } + } + + async fn consumer(rx: flume::Receiver<usize>) { + let mut expected = 0; + while let Ok(value) = rx.recv_async().await { + assert_eq!(value, expected); + expected += 1; + } + } + + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(test()); +} |