diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/flume/tests/mpsc.rs | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/flume/tests/mpsc.rs')
-rw-r--r-- | vendor/flume/tests/mpsc.rs | 2095 |
1 files changed, 2095 insertions, 0 deletions
diff --git a/vendor/flume/tests/mpsc.rs b/vendor/flume/tests/mpsc.rs new file mode 100644 index 0000000..213b9d8 --- /dev/null +++ b/vendor/flume/tests/mpsc.rs @@ -0,0 +1,2095 @@ +//! Tests copied from `std::sync::mpsc`. +//! +//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but +//! modified to work with `crossbeam-channel` instead. +//! +//! Minor tweaks were needed to make the tests compile: +//! +//! - Replace `box` syntax with `Box::new`. +//! - Replace all uses of `Select` with `select!`. +//! - Change the imports. +//! - Join all spawned threads. +//! - Removed assertion from oneshot_multi_thread_send_close_stress tests. +//! +//! Source: +//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc +//! +//! Copyright & License: +//! - Copyright 2013-2014 The Rust Project Developers +//! - Apache License, Version 2.0 or MIT license, at your option +//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT +//! - https://www.rust-lang.org/en-US/legal.html + +#[macro_use] +extern crate crossbeam_channel as cc; + +use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; +use std::sync::mpsc::{SendError, TrySendError}; +use std::thread::JoinHandle; +use std::time::Duration; + +pub struct Sender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> Sender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + inner: self.inner.clone(), + } + } +} + +pub struct SyncSender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> SyncSender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } + + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.inner.try_send(t).map_err(|err| match err { + cc::TrySendError::Full(m) => TrySendError::Full(m), + cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m), + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + inner: self.inner.clone(), + } + } +} + +pub struct Receiver<T> { + pub inner: cc::Receiver<T>, +} + +impl<T> Receiver<T> { + pub fn try_recv(&self) -> Result<T, TryRecvError> { + self.inner.try_recv().map_err(|err| match err { + cc::TryRecvError::Empty => TryRecvError::Empty, + cc::TryRecvError::Disconnected => TryRecvError::Disconnected, + }) + } + + pub fn recv(&self) -> Result<T, RecvError> { + self.inner.recv().map_err(|_| RecvError) + } + + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + self.inner.recv_timeout(timeout).map_err(|err| match err { + cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout, + cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, + }) + } + + pub fn iter(&self) -> Iter<T> { + Iter { inner: self } + } + + pub fn try_iter(&self) -> TryIter<T> { + TryIter { inner: self } + } +} + +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> IntoIter<T> { + IntoIter { inner: self } + } +} + +pub struct TryIter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.try_recv().ok() + } +} + +pub struct Iter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub struct IntoIter<T> { + inner: Receiver<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (s, r) = cc::unbounded(); + let s = Sender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (s, r) = cc::bounded(bound); + let s = SyncSender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +macro_rules! select { + ( + $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ + ) => ({ + crossbeam_channel_internal! { + $( + recv(($rx).inner) -> res => { + let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); + $code + } + )+ + } + }) +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::{Duration, Instant}; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().unwrap(); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + t.join().ok().unwrap(); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().unwrap(); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + let t = thread::spawn(move || { + drop(tx); + }); + t.join().unwrap(); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<Box<isize>>(); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel(); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t2) = recv(rx, 0) { + ts.push(t2); + } + + fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + let t = thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + t.join().unwrap() + } + + #[test] + fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); + } + + #[test] + fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + let mut ts = Vec::with_capacity(stress); + for i in 0..stress { + let tx = tx.clone(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + ts.push(t); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); + } + + #[test] + fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod sync_channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::Duration; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = sync_channel::<Box<isize>>(1); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn recv_timeout() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, 10000); + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + + drop(tx); + + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<i32>(0); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<i32>(0); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<i32>>(0); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<i32>>(1); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); + } + + #[test] + fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<i32>(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t) = recv(rx, 0) { + ts.push(t); + } + + fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<i32>(0); + let (total_tx, total_rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<i32>(0); + let (count_tx, count_rx) = sync_channel(0); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = sync_channel::<i32>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn send1() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); + t.join().unwrap(); + } + + #[test] + fn send2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.send(1), Ok(())); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send4() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let t2 = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); + t.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn try_send1() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send2() { + let (tx, _rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); + } + + #[test] + fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs +mod select_tests { + use super::*; + + use std::thread; + + #[test] + fn smoke() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + tx1.send(1).unwrap(); + select! { + foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); }, + _bar = rx2.recv() => { panic!() } + } + tx2.send(2).unwrap(); + select! { + _foo = rx1.recv() => { panic!() }, + bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) } + } + drop(tx1); + select! { + foo = rx1.recv() => { assert!(foo.is_err()); }, + _bar = rx2.recv() => { panic!() } + } + drop(tx2); + select! { + bar = rx2.recv() => { assert!(bar.is_err()); } + } + } + + #[test] + fn smoke2() { + let (_tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (_tx3, rx3) = channel::<i32>(); + let (_tx4, rx4) = channel::<i32>(); + let (tx5, rx5) = channel::<i32>(); + tx5.send(4).unwrap(); + select! { + _foo = rx1.recv() => { panic!("1") }, + _foo = rx2.recv() => { panic!("2") }, + _foo = rx3.recv() => { panic!("3") }, + _foo = rx4.recv() => { panic!("4") }, + foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); } + } + } + + #[test] + fn closed() { + let (_tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + drop(tx2); + + select! { + _a1 = rx1.recv() => { panic!() }, + a2 = rx2.recv() => { assert!(a2.is_err()); } + } + } + + #[test] + fn unblocks() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + rx3.recv().unwrap(); + for _ in 0..20 { + thread::yield_now(); + } + }); + + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + _b = rx2.recv() => { panic!() } + } + tx3.send(1).unwrap(); + select! { + a = rx1.recv() => { assert!(a.is_err()) }, + _b = rx2.recv() => { panic!() } + } + t.join().unwrap(); + } + + #[test] + fn both_ready() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn stress() { + const AMT: i32 = 10000; + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for i in 0..AMT { + if i % 2 == 0 { + tx1.send(i).unwrap(); + } else { + tx2.send(i).unwrap(); + } + rx3.recv().unwrap(); + } + }); + + for i in 0..AMT { + select! { + i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, + i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } + } + tx3.send(()).unwrap(); + } + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning2() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn cloning3() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + select! { + _ = rx1.recv() => panic!(), + _ = rx2.recv() => {} + } + tx3.send(()).unwrap(); + }); + + for _ in 0..1000 { + thread::yield_now(); + } + drop(tx1.clone()); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn preflight1() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight2() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight3() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight4() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight5() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight6() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight7() { + let (tx, rx) = channel::<()>(); + drop(tx); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight8() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight9() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn oneshot_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn stream_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + tx1.send(()).unwrap(); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn shared_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + drop(tx1.clone()); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn sync1() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + } + + #[test] + fn sync2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..100 { + thread::yield_now() + } + tx.send(1).unwrap(); + }); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + t.join().unwrap(); + } + + #[test] + fn sync3() { + let (tx1, rx1) = sync_channel::<i32>(0); + let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel(); + let t = thread::spawn(move || { + tx1.send(1).unwrap(); + }); + let t2 = thread::spawn(move || { + tx2.send(2).unwrap(); + }); + select! { + n = rx1.recv() => { + let n = n.unwrap(); + assert_eq!(n, 1); + assert_eq!(rx2.recv().unwrap(), 2); + }, + n = rx2.recv() => { + let n = n.unwrap(); + assert_eq!(n, 2); + assert_eq!(rx1.recv().unwrap(), 1); + } + } + t.join().unwrap(); + t2.join().unwrap(); + } +} |