use std::cell::{Cell, UnsafeCell}; use std::cmp; use std::fmt; use std::iter::FromIterator; use std::marker::PhantomData; use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::ptr; use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; use crate::epoch::{self, Atomic, Owned}; use crate::utils::{Backoff, CachePadded}; // Minimum buffer capacity. const MIN_CAP: usize = 64; // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. const MAX_BATCH: usize = 32; // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets // deallocated as soon as possible. const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; /// A buffer that holds tasks in a worker queue. /// /// This is just a pointer to the buffer and its length - dropping an instance of this struct will /// *not* deallocate the buffer. struct Buffer { /// Pointer to the allocated memory. ptr: *mut T, /// Capacity of the buffer. Always a power of two. cap: usize, } unsafe impl Send for Buffer {} impl Buffer { /// Allocates a new buffer with the specified capacity. fn alloc(cap: usize) -> Buffer { debug_assert_eq!(cap, cap.next_power_of_two()); let mut v = ManuallyDrop::new(Vec::with_capacity(cap)); let ptr = v.as_mut_ptr(); Buffer { ptr, cap } } /// Deallocates the buffer. unsafe fn dealloc(self) { drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); } /// Returns a pointer to the task at the specified `index`. unsafe fn at(&self, index: isize) -> *mut T { // `self.cap` is always a power of two. // We do all the loads at `MaybeUninit` because we might realize, after loading, that we // don't actually have the right to access this memory. self.ptr.offset(index & (self.cap - 1) as isize) } /// Writes `task` into the specified `index`. /// /// This method might be concurrently called with another `read` at the same index, which is /// technically speaking a data race and therefore UB. We should use an atomic store here, but /// that would be more expensive and difficult to implement generically for all types `T`. /// Hence, as a hack, we use a volatile write instead. unsafe fn write(&self, index: isize, task: MaybeUninit) { ptr::write_volatile(self.at(index).cast::>(), task) } /// Reads a task from the specified `index`. /// /// This method might be concurrently called with another `write` at the same index, which is /// technically speaking a data race and therefore UB. We should use an atomic load here, but /// that would be more expensive and difficult to implement generically for all types `T`. /// Hence, as a hack, we use a volatile load instead. unsafe fn read(&self, index: isize) -> MaybeUninit { ptr::read_volatile(self.at(index).cast::>()) } } impl Clone for Buffer { fn clone(&self) -> Buffer { *self } } impl Copy for Buffer {} /// Internal queue data shared between the worker and stealers. /// /// The implementation is based on the following work: /// /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. /// PPoPP 2013.][weak-mem] /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ /// atomics. OOPSLA 2013.][checker] /// /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 struct Inner { /// The front index. front: AtomicIsize, /// The back index. back: AtomicIsize, /// The underlying buffer. buffer: CachePadded>>, } impl Drop for Inner { fn drop(&mut self) { // Load the back index, front index, and buffer. let b = *self.back.get_mut(); let f = *self.front.get_mut(); unsafe { let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); // Go through the buffer from front to back and drop all tasks in the queue. let mut i = f; while i != b { buffer.deref().at(i).drop_in_place(); i = i.wrapping_add(1); } // Free the memory allocated by the buffer. buffer.into_owned().into_box().dealloc(); } } } /// Worker queue flavor: FIFO or LIFO. #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum Flavor { /// The first-in first-out flavor. Fifo, /// The last-in first-out flavor. Lifo, } /// A worker queue. /// /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal /// tasks from it. Task schedulers typically create a single worker queue per thread. /// /// # Examples /// /// A FIFO worker: /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w = Worker::new_fifo(); /// let s = w.stealer(); /// /// w.push(1); /// w.push(2); /// w.push(3); /// /// assert_eq!(s.steal(), Steal::Success(1)); /// assert_eq!(w.pop(), Some(2)); /// assert_eq!(w.pop(), Some(3)); /// ``` /// /// A LIFO worker: /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w = Worker::new_lifo(); /// let s = w.stealer(); /// /// w.push(1); /// w.push(2); /// w.push(3); /// /// assert_eq!(s.steal(), Steal::Success(1)); /// assert_eq!(w.pop(), Some(3)); /// assert_eq!(w.pop(), Some(2)); /// ``` pub struct Worker { /// A reference to the inner representation of the queue. inner: Arc>>, /// A copy of `inner.buffer` for quick access. buffer: Cell>, /// The flavor of the queue. flavor: Flavor, /// Indicates that the worker cannot be shared among threads. _marker: PhantomData<*mut ()>, // !Send + !Sync } unsafe impl Send for Worker {} impl Worker { /// Creates a FIFO worker queue. /// /// Tasks are pushed and popped from opposite ends. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::::new_fifo(); /// ``` pub fn new_fifo() -> Worker { let buffer = Buffer::alloc(MIN_CAP); let inner = Arc::new(CachePadded::new(Inner { front: AtomicIsize::new(0), back: AtomicIsize::new(0), buffer: CachePadded::new(Atomic::new(buffer)), })); Worker { inner, buffer: Cell::new(buffer), flavor: Flavor::Fifo, _marker: PhantomData, } } /// Creates a LIFO worker queue. /// /// Tasks are pushed and popped from the same end. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::::new_lifo(); /// ``` pub fn new_lifo() -> Worker { let buffer = Buffer::alloc(MIN_CAP); let inner = Arc::new(CachePadded::new(Inner { front: AtomicIsize::new(0), back: AtomicIsize::new(0), buffer: CachePadded::new(Atomic::new(buffer)), })); Worker { inner, buffer: Cell::new(buffer), flavor: Flavor::Lifo, _marker: PhantomData, } } /// Creates a stealer for this queue. /// /// The returned stealer can be shared among threads and cloned. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::::new_lifo(); /// let s = w.stealer(); /// ``` pub fn stealer(&self) -> Stealer { Stealer { inner: self.inner.clone(), flavor: self.flavor, } } /// Resizes the internal buffer to the new capacity of `new_cap`. #[cold] unsafe fn resize(&self, new_cap: usize) { // Load the back index, front index, and buffer. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Relaxed); let buffer = self.buffer.get(); // Allocate a new buffer and copy data from the old buffer to the new one. let new = Buffer::alloc(new_cap); let mut i = f; while i != b { ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); i = i.wrapping_add(1); } let guard = &epoch::pin(); // Replace the old buffer with the new one. self.buffer.replace(new); let old = self.inner .buffer .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); // Destroy the old buffer later. guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); // If the buffer is very large, then flush the thread-local garbage in order to deallocate // it as soon as possible. if mem::size_of::() * new_cap >= FLUSH_THRESHOLD_BYTES { guard.flush(); } } /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the /// buffer. fn reserve(&self, reserve_cap: usize) { if reserve_cap > 0 { // Compute the current length. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::SeqCst); let len = b.wrapping_sub(f) as usize; // The current capacity. let cap = self.buffer.get().cap; // Is there enough capacity to push `reserve_cap` tasks? if cap - len < reserve_cap { // Keep doubling the capacity as much as is needed. let mut new_cap = cap * 2; while new_cap - len < reserve_cap { new_cap *= 2; } // Resize the buffer. unsafe { self.resize(new_cap); } } } } /// Returns `true` if the queue is empty. /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_lifo(); /// /// assert!(w.is_empty()); /// w.push(1); /// assert!(!w.is_empty()); /// ``` pub fn is_empty(&self) -> bool { let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::SeqCst); b.wrapping_sub(f) <= 0 } /// Returns the number of tasks in the deque. /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_lifo(); /// /// assert_eq!(w.len(), 0); /// w.push(1); /// assert_eq!(w.len(), 1); /// w.push(1); /// assert_eq!(w.len(), 2); /// ``` pub fn len(&self) -> usize { let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::SeqCst); b.wrapping_sub(f).max(0) as usize } /// Pushes a task into the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_lifo(); /// w.push(1); /// w.push(2); /// ``` pub fn push(&self, task: T) { // Load the back index, front index, and buffer. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Acquire); let mut buffer = self.buffer.get(); // Calculate the length of the queue. let len = b.wrapping_sub(f); // Is the queue full? if len >= buffer.cap as isize { // Yes. Grow the underlying buffer. unsafe { self.resize(2 * buffer.cap); } buffer = self.buffer.get(); } // Write `task` into the slot. unsafe { buffer.write(b, MaybeUninit::new(task)); } atomic::fence(Ordering::Release); // Increment the back index. // // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data // races because it doesn't understand fences. self.inner.back.store(b.wrapping_add(1), Ordering::Release); } /// Pops a task from the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_fifo(); /// w.push(1); /// w.push(2); /// /// assert_eq!(w.pop(), Some(1)); /// assert_eq!(w.pop(), Some(2)); /// assert_eq!(w.pop(), None); /// ``` pub fn pop(&self) -> Option { // Load the back and front index. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Relaxed); // Calculate the length of the queue. let len = b.wrapping_sub(f); // Is the queue empty? if len <= 0 { return None; } match self.flavor { // Pop from the front of the queue. Flavor::Fifo => { // Try incrementing the front index to pop the task. let f = self.inner.front.fetch_add(1, Ordering::SeqCst); let new_f = f.wrapping_add(1); if b.wrapping_sub(new_f) < 0 { self.inner.front.store(f, Ordering::Relaxed); return None; } unsafe { // Read the popped task. let buffer = self.buffer.get(); let task = buffer.read(f).assume_init(); // Shrink the buffer if `len - 1` is less than one fourth of the capacity. if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { self.resize(buffer.cap / 2); } Some(task) } } // Pop from the back of the queue. Flavor::Lifo => { // Decrement the back index. let b = b.wrapping_sub(1); self.inner.back.store(b, Ordering::Relaxed); atomic::fence(Ordering::SeqCst); // Load the front index. let f = self.inner.front.load(Ordering::Relaxed); // Compute the length after the back index was decremented. let len = b.wrapping_sub(f); if len < 0 { // The queue is empty. Restore the back index to the original task. self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); None } else { // Read the task to be popped. let buffer = self.buffer.get(); let mut task = unsafe { Some(buffer.read(b)) }; // Are we popping the last task from the queue? if len == 0 { // Try incrementing the front index. if self .inner .front .compare_exchange( f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed, ) .is_err() { // Failed. We didn't pop anything. Reset to `None`. task.take(); } // Restore the back index to the original task. self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); } else { // Shrink the buffer if `len` is less than one fourth of the capacity. if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { unsafe { self.resize(buffer.cap / 2); } } } task.map(|t| unsafe { t.assume_init() }) } } } } } impl fmt::Debug for Worker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Worker { .. }") } } /// A stealer handle of a worker queue. /// /// Stealers can be shared among threads. /// /// Task schedulers typically have a single worker queue per worker thread. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w = Worker::new_lifo(); /// w.push(1); /// w.push(2); /// /// let s = w.stealer(); /// assert_eq!(s.steal(), Steal::Success(1)); /// assert_eq!(s.steal(), Steal::Success(2)); /// assert_eq!(s.steal(), Steal::Empty); /// ``` pub struct Stealer { /// A reference to the inner representation of the queue. inner: Arc>>, /// The flavor of the queue. flavor: Flavor, } unsafe impl Send for Stealer {} unsafe impl Sync for Stealer {} impl Stealer { /// Returns `true` if the queue is empty. /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_lifo(); /// let s = w.stealer(); /// /// assert!(s.is_empty()); /// w.push(1); /// assert!(!s.is_empty()); /// ``` pub fn is_empty(&self) -> bool { let f = self.inner.front.load(Ordering::Acquire); atomic::fence(Ordering::SeqCst); let b = self.inner.back.load(Ordering::Acquire); b.wrapping_sub(f) <= 0 } /// Returns the number of tasks in the deque. /// /// ``` /// use crossbeam_deque::Worker; /// /// let w = Worker::new_lifo(); /// let s = w.stealer(); /// /// assert_eq!(s.len(), 0); /// w.push(1); /// assert_eq!(s.len(), 1); /// w.push(2); /// assert_eq!(s.len(), 2); /// ``` pub fn len(&self) -> usize { let f = self.inner.front.load(Ordering::Acquire); atomic::fence(Ordering::SeqCst); let b = self.inner.back.load(Ordering::Acquire); b.wrapping_sub(f).max(0) as usize } /// Steals a task from the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w = Worker::new_lifo(); /// w.push(1); /// w.push(2); /// /// let s = w.stealer(); /// assert_eq!(s.steal(), Steal::Success(1)); /// assert_eq!(s.steal(), Steal::Success(2)); /// ``` pub fn steal(&self) -> Steal { // Load the front index. let f = self.inner.front.load(Ordering::Acquire); // A SeqCst fence is needed here. // // If the current thread is already pinned (reentrantly), we must manually issue the // fence. Otherwise, the following pinning will issue the fence anyway, so we don't // have to. if epoch::is_pinned() { atomic::fence(Ordering::SeqCst); } let guard = &epoch::pin(); // Load the back index. let b = self.inner.back.load(Ordering::Acquire); // Is the queue empty? if b.wrapping_sub(f) <= 0 { return Steal::Empty; } // Load the buffer and read the task at the front. let buffer = self.inner.buffer.load(Ordering::Acquire, guard); let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. // If the buffer has been swapped or the increment fails, we retry. if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { // We didn't steal this task, forget it. return Steal::Retry; } // Return the stolen task. Steal::Success(unsafe { task.assume_init() }) } /// Steals a batch of tasks and pushes them into another worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w1 = Worker::new_fifo(); /// w1.push(1); /// w1.push(2); /// w1.push(3); /// w1.push(4); /// /// let s = w1.stealer(); /// let w2 = Worker::new_fifo(); /// /// let _ = s.steal_batch(&w2); /// assert_eq!(w2.pop(), Some(1)); /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { self.steal_batch_with_limit(dest, MAX_BATCH) } /// Steals no more than `limit` of tasks and pushes them into another worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than the given limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::Worker; /// /// let w1 = Worker::new_fifo(); /// w1.push(1); /// w1.push(2); /// w1.push(3); /// w1.push(4); /// w1.push(5); /// w1.push(6); /// /// let s = w1.stealer(); /// let w2 = Worker::new_fifo(); /// /// let _ = s.steal_batch_with_limit(&w2, 2); /// assert_eq!(w2.pop(), Some(1)); /// assert_eq!(w2.pop(), Some(2)); /// assert_eq!(w2.pop(), None); /// /// w1.push(7); /// w1.push(8); /// // Setting a large limit does not guarantee that all elements will be popped. In this case, /// // half of the elements are currently popped, but the number of popped elements is considered /// // an implementation detail that may be changed in the future. /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); /// assert_eq!(w2.len(), 3); /// ``` pub fn steal_batch_with_limit(&self, dest: &Worker, limit: usize) -> Steal<()> { assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { if dest.is_empty() { return Steal::Empty; } else { return Steal::Success(()); } } // Load the front index. let mut f = self.inner.front.load(Ordering::Acquire); // A SeqCst fence is needed here. // // If the current thread is already pinned (reentrantly), we must manually issue the // fence. Otherwise, the following pinning will issue the fence anyway, so we don't // have to. if epoch::is_pinned() { atomic::fence(Ordering::SeqCst); } let guard = &epoch::pin(); // Load the back index. let b = self.inner.back.load(Ordering::Acquire); // Is the queue empty? let len = b.wrapping_sub(f); if len <= 0 { return Steal::Empty; } // Reserve capacity for the stolen batch. let batch_size = cmp::min((len as usize + 1) / 2, limit); dest.reserve(batch_size); let mut batch_size = batch_size as isize; // Get the destination buffer and back index. let dest_buffer = dest.buffer.get(); let mut dest_b = dest.inner.back.load(Ordering::Relaxed); // Load the buffer. let buffer = self.inner.buffer.load(Ordering::Acquire, guard); match self.flavor { // Steal a batch of tasks from the front at once. Flavor::Fifo => { // Copy the batch from the source to the destination buffer. match dest.flavor { Flavor::Fifo => { for i in 0..batch_size { unsafe { let task = buffer.deref().read(f.wrapping_add(i)); dest_buffer.write(dest_b.wrapping_add(i), task); } } } Flavor::Lifo => { for i in 0..batch_size { unsafe { let task = buffer.deref().read(f.wrapping_add(i)); dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); } } } } // Try incrementing the front index to steal the batch. // If the buffer has been swapped or the increment fails, we retry. if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange( f, f.wrapping_add(batch_size), Ordering::SeqCst, Ordering::Relaxed, ) .is_err() { return Steal::Retry; } dest_b = dest_b.wrapping_add(batch_size); } // Steal a batch of tasks from the front one by one. Flavor::Lifo => { // This loop may modify the batch_size, which triggers a clippy lint warning. // Use a new variable to avoid the warning, and to make it clear we aren't // modifying the loop exit condition during iteration. let original_batch_size = batch_size; for i in 0..original_batch_size { // If this is not the first steal, check whether the queue is empty. if i > 0 { // We've already got the current front index. Now execute the fence to // synchronize with other threads. atomic::fence(Ordering::SeqCst); // Load the back index. let b = self.inner.back.load(Ordering::Acquire); // Is the queue empty? if b.wrapping_sub(f) <= 0 { batch_size = i; break; } } // Read the task at the front. let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. // If the buffer has been swapped or the increment fails, we retry. if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange( f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed, ) .is_err() { // We didn't steal this task, forget it and break from the loop. batch_size = i; break; } // Write the stolen task into the destination buffer. unsafe { dest_buffer.write(dest_b, task); } // Move the source front index and the destination back index one step forward. f = f.wrapping_add(1); dest_b = dest_b.wrapping_add(1); } // If we didn't steal anything, the operation needs to be retried. if batch_size == 0 { return Steal::Retry; } // If stealing into a FIFO queue, stolen tasks need to be reversed. if dest.flavor == Flavor::Fifo { for i in 0..batch_size / 2 { unsafe { let i1 = dest_b.wrapping_sub(batch_size - i); let i2 = dest_b.wrapping_sub(i + 1); let t1 = dest_buffer.read(i1); let t2 = dest_buffer.read(i2); dest_buffer.write(i1, t2); dest_buffer.write(i2, t1); } } } } } atomic::fence(Ordering::Release); // Update the back index in the destination queue. // // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data // races because it doesn't understand fences. dest.inner.back.store(dest_b, Ordering::Release); // Return with success. Steal::Success(()) } /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w1 = Worker::new_fifo(); /// w1.push(1); /// w1.push(2); /// w1.push(3); /// w1.push(4); /// /// let s = w1.stealer(); /// let w2 = Worker::new_fifo(); /// /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) } /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from /// that worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than the given limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Steal, Worker}; /// /// let w1 = Worker::new_fifo(); /// w1.push(1); /// w1.push(2); /// w1.push(3); /// w1.push(4); /// w1.push(5); /// w1.push(6); /// /// let s = w1.stealer(); /// let w2 = Worker::new_fifo(); /// /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); /// assert_eq!(w2.pop(), Some(2)); /// assert_eq!(w2.pop(), None); /// /// w1.push(7); /// w1.push(8); /// // Setting a large limit does not guarantee that all elements will be popped. In this case, /// // half of the elements are currently popped, but the number of popped elements is considered /// // an implementation detail that may be changed in the future. /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); /// assert_eq!(w2.pop(), Some(4)); /// assert_eq!(w2.pop(), Some(5)); /// assert_eq!(w2.pop(), None); /// ``` pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker, limit: usize) -> Steal { assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { match dest.pop() { None => return Steal::Empty, Some(task) => return Steal::Success(task), } } // Load the front index. let mut f = self.inner.front.load(Ordering::Acquire); // A SeqCst fence is needed here. // // If the current thread is already pinned (reentrantly), we must manually issue the // fence. Otherwise, the following pinning will issue the fence anyway, so we don't // have to. if epoch::is_pinned() { atomic::fence(Ordering::SeqCst); } let guard = &epoch::pin(); // Load the back index. let b = self.inner.back.load(Ordering::Acquire); // Is the queue empty? let len = b.wrapping_sub(f); if len <= 0 { return Steal::Empty; } // Reserve capacity for the stolen batch. let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); dest.reserve(batch_size); let mut batch_size = batch_size as isize; // Get the destination buffer and back index. let dest_buffer = dest.buffer.get(); let mut dest_b = dest.inner.back.load(Ordering::Relaxed); // Load the buffer let buffer = self.inner.buffer.load(Ordering::Acquire, guard); // Read the task at the front. let mut task = unsafe { buffer.deref().read(f) }; match self.flavor { // Steal a batch of tasks from the front at once. Flavor::Fifo => { // Copy the batch from the source to the destination buffer. match dest.flavor { Flavor::Fifo => { for i in 0..batch_size { unsafe { let task = buffer.deref().read(f.wrapping_add(i + 1)); dest_buffer.write(dest_b.wrapping_add(i), task); } } } Flavor::Lifo => { for i in 0..batch_size { unsafe { let task = buffer.deref().read(f.wrapping_add(i + 1)); dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); } } } } // Try incrementing the front index to steal the task. // If the buffer has been swapped or the increment fails, we retry. if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange( f, f.wrapping_add(batch_size + 1), Ordering::SeqCst, Ordering::Relaxed, ) .is_err() { // We didn't steal this task, forget it. return Steal::Retry; } dest_b = dest_b.wrapping_add(batch_size); } // Steal a batch of tasks from the front one by one. Flavor::Lifo => { // Try incrementing the front index to steal the task. if self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { // We didn't steal this task, forget it. return Steal::Retry; } // Move the front index one step forward. f = f.wrapping_add(1); // Repeat the same procedure for the batch steals. // // This loop may modify the batch_size, which triggers a clippy lint warning. // Use a new variable to avoid the warning, and to make it clear we aren't // modifying the loop exit condition during iteration. let original_batch_size = batch_size; for i in 0..original_batch_size { // We've already got the current front index. Now execute the fence to // synchronize with other threads. atomic::fence(Ordering::SeqCst); // Load the back index. let b = self.inner.back.load(Ordering::Acquire); // Is the queue empty? if b.wrapping_sub(f) <= 0 { batch_size = i; break; } // Read the task at the front. let tmp = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. // If the buffer has been swapped or the increment fails, we retry. if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange( f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed, ) .is_err() { // We didn't steal this task, forget it and break from the loop. batch_size = i; break; } // Write the previously stolen task into the destination buffer. unsafe { dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); } // Move the source front index and the destination back index one step forward. f = f.wrapping_add(1); dest_b = dest_b.wrapping_add(1); } // If stealing into a FIFO queue, stolen tasks need to be reversed. if dest.flavor == Flavor::Fifo { for i in 0..batch_size / 2 { unsafe { let i1 = dest_b.wrapping_sub(batch_size - i); let i2 = dest_b.wrapping_sub(i + 1); let t1 = dest_buffer.read(i1); let t2 = dest_buffer.read(i2); dest_buffer.write(i1, t2); dest_buffer.write(i2, t1); } } } } } atomic::fence(Ordering::Release); // Update the back index in the destination queue. // // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data // races because it doesn't understand fences. dest.inner.back.store(dest_b, Ordering::Release); // Return with success. Steal::Success(unsafe { task.assume_init() }) } } impl Clone for Stealer { fn clone(&self) -> Stealer { Stealer { inner: self.inner.clone(), flavor: self.flavor, } } } impl fmt::Debug for Stealer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Stealer { .. }") } } // Bits indicating the state of a slot: // * If a task has been written into the slot, `WRITE` is set. // * If a task has been read from the slot, `READ` is set. // * If the block is being destroyed, `DESTROY` is set. const WRITE: usize = 1; const READ: usize = 2; const DESTROY: usize = 4; // Each block covers one "lap" of indices. const LAP: usize = 64; // The maximum number of values a block can hold. const BLOCK_CAP: usize = LAP - 1; // How many lower bits are reserved for metadata. const SHIFT: usize = 1; // Indicates that the block is not the last one. const HAS_NEXT: usize = 1; /// A slot in a block. struct Slot { /// The task. task: UnsafeCell>, /// The state of the slot. state: AtomicUsize, } impl Slot { const UNINIT: Self = Self { task: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), }; /// Waits until a task is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); while self.state.load(Ordering::Acquire) & WRITE == 0 { backoff.snooze(); } } } /// A block in a linked list. /// /// Each block in the list can hold up to `BLOCK_CAP` values. struct Block { /// The next block in the linked list. next: AtomicPtr>, /// Slots for values. slots: [Slot; BLOCK_CAP], } impl Block { /// Creates an empty block that starts at `start_index`. fn new() -> Block { Self { next: AtomicPtr::new(ptr::null_mut()), slots: [Slot::UNINIT; BLOCK_CAP], } } /// Waits until the next pointer is set. fn wait_next(&self) -> *mut Block { let backoff = Backoff::new(); loop { let next = self.next.load(Ordering::Acquire); if !next.is_null() { return next; } backoff.snooze(); } } /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. unsafe fn destroy(this: *mut Block, count: usize) { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. for i in (0..count).rev() { let slot = (*this).slots.get_unchecked(i); // Mark the `DESTROY` bit if a thread is still using the slot. if slot.state.load(Ordering::Acquire) & READ == 0 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 { // If a thread is still using the slot, it will continue destruction of the block. return; } } // No thread is using the block, now it is safe to destroy it. drop(Box::from_raw(this)); } } /// A position in a queue. struct Position { /// The index in the queue. index: AtomicUsize, /// The block in the linked list. block: AtomicPtr>, } /// An injector queue. /// /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have /// a single injector queue, which is the entry point for new tasks. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Steal}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// /// assert_eq!(q.steal(), Steal::Success(1)); /// assert_eq!(q.steal(), Steal::Success(2)); /// assert_eq!(q.steal(), Steal::Empty); /// ``` pub struct Injector { /// The head of the queue. head: CachePadded>, /// The tail of the queue. tail: CachePadded>, /// Indicates that dropping a `Injector` may drop values of type `T`. _marker: PhantomData, } unsafe impl Send for Injector {} unsafe impl Sync for Injector {} impl Default for Injector { fn default() -> Self { let block = Box::into_raw(Box::new(Block::::new())); Self { head: CachePadded::new(Position { block: AtomicPtr::new(block), index: AtomicUsize::new(0), }), tail: CachePadded::new(Position { block: AtomicPtr::new(block), index: AtomicUsize::new(0), }), _marker: PhantomData, } } } impl Injector { /// Creates a new injector queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::Injector; /// /// let q = Injector::::new(); /// ``` pub fn new() -> Injector { Self::default() } /// Pushes a task into the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::Injector; /// /// let w = Injector::new(); /// w.push(1); /// w.push(2); /// ``` pub fn push(&self, task: T) { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); let mut block = self.tail.block.load(Ordering::Acquire); let mut next_block = None; loop { // Calculate the offset of the index into the block. let offset = (tail >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); tail = self.tail.index.load(Ordering::Acquire); block = self.tail.block.load(Ordering::Acquire); continue; } // If we're going to have to install the next block, allocate it in advance in order to // make the wait for other threads as short as possible. if offset + 1 == BLOCK_CAP && next_block.is_none() { next_block = Some(Box::new(Block::::new())); } let new_tail = tail + (1 << SHIFT); // Try advancing the tail forward. match self.tail.index.compare_exchange_weak( tail, new_tail, Ordering::SeqCst, Ordering::Acquire, ) { Ok(_) => unsafe { // If we've reached the end of the block, install the next one. if offset + 1 == BLOCK_CAP { let next_block = Box::into_raw(next_block.unwrap()); let next_index = new_tail.wrapping_add(1 << SHIFT); self.tail.block.store(next_block, Ordering::Release); self.tail.index.store(next_index, Ordering::Release); (*block).next.store(next_block, Ordering::Release); } // Write the task into the slot. let slot = (*block).slots.get_unchecked(offset); slot.task.get().write(MaybeUninit::new(task)); slot.state.fetch_or(WRITE, Ordering::Release); return; }, Err(t) => { tail = t; block = self.tail.block.load(Ordering::Acquire); backoff.spin(); } } } } /// Steals a task from the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Steal}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// /// assert_eq!(q.steal(), Steal::Success(1)); /// assert_eq!(q.steal(), Steal::Success(2)); /// assert_eq!(q.steal(), Steal::Empty); /// ``` pub fn steal(&self) -> Steal { let mut head; let mut block; let mut offset; let backoff = Backoff::new(); loop { head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. offset = (head >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); } else { break; } } let mut new_head = head + (1 << SHIFT); if new_head & HAS_NEXT == 0 { atomic::fence(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::Relaxed); // If the tail equals the head, that means the queue is empty. if head >> SHIFT == tail >> SHIFT { return Steal::Empty; } // If head and tail are not in the same block, set `HAS_NEXT` in head. if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; } } // Try moving the head index forward. if self .head .index .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) .is_err() { return Steal::Retry; } unsafe { // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { let next = (*block).wait_next(); let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= HAS_NEXT; } self.head.block.store(next, Ordering::Release); self.head.index.store(next_index, Ordering::Release); } // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); let task = slot.task.get().read().assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. if (offset + 1 == BLOCK_CAP) || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) { Block::destroy(block, offset); } Steal::Success(task) } } /// Steals a batch of tasks and pushes them into a worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Worker}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// q.push(3); /// q.push(4); /// /// let w = Worker::new_fifo(); /// let _ = q.steal_batch(&w); /// assert_eq!(w.pop(), Some(1)); /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { self.steal_batch_with_limit(dest, MAX_BATCH) } /// Steals no more than of tasks and pushes them into a worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Worker}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// q.push(3); /// q.push(4); /// q.push(5); /// q.push(6); /// /// let w = Worker::new_fifo(); /// let _ = q.steal_batch_with_limit(&w, 2); /// assert_eq!(w.pop(), Some(1)); /// assert_eq!(w.pop(), Some(2)); /// assert_eq!(w.pop(), None); /// /// q.push(7); /// q.push(8); /// // Setting a large limit does not guarantee that all elements will be popped. In this case, /// // half of the elements are currently popped, but the number of popped elements is considered /// // an implementation detail that may be changed in the future. /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); /// assert_eq!(w.len(), 3); /// ``` pub fn steal_batch_with_limit(&self, dest: &Worker, limit: usize) -> Steal<()> { assert!(limit > 0); let mut head; let mut block; let mut offset; let backoff = Backoff::new(); loop { head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. offset = (head >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); } else { break; } } let mut new_head = head; let advance; if new_head & HAS_NEXT == 0 { atomic::fence(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::Relaxed); // If the tail equals the head, that means the queue is empty. if head >> SHIFT == tail >> SHIFT { return Steal::Empty; } // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate // the right batch size to steal. if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT; let new_offset = offset + advance; // Try moving the head index forward. if self .head .index .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) .is_err() { return Steal::Retry; } // Reserve capacity for the stolen batch. let batch_size = new_offset - offset; dest.reserve(batch_size); // Get the destination buffer and back index. let dest_buffer = dest.buffer.get(); let dest_b = dest.inner.back.load(Ordering::Relaxed); unsafe { // If we've reached the end of the block, move to the next one. if new_offset == BLOCK_CAP { let next = (*block).wait_next(); let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= HAS_NEXT; } self.head.block.store(next, Ordering::Release); self.head.index.store(next_index, Ordering::Release); } // Copy values from the injector into the destination queue. match dest.flavor { Flavor::Fifo => { for i in 0..batch_size { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); } } Flavor::Lifo => { for i in 0..batch_size { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); } } } atomic::fence(Ordering::Release); // Update the back index in the destination queue. // // This ordering could be `Relaxed`, but then thread sanitizer would falsely report // data races because it doesn't understand fences. dest.inner .back .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. if new_offset == BLOCK_CAP { Block::destroy(block, offset); } else { for i in offset..new_offset { let slot = (*block).slots.get_unchecked(i); if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { Block::destroy(block, offset); break; } } } Steal::Success(()) } } /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Steal, Worker}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// q.push(3); /// q.push(4); /// /// let w = Worker::new_fifo(); /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly // better, but we may change it in the future to be compatible with the same method in Stealer. self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) } /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. /// /// How many tasks exactly will be stolen is not specified. That said, this method will try to /// steal around half of the tasks in the queue, but also not more than the given limit. /// /// # Examples /// /// ``` /// use crossbeam_deque::{Injector, Steal, Worker}; /// /// let q = Injector::new(); /// q.push(1); /// q.push(2); /// q.push(3); /// q.push(4); /// q.push(5); /// q.push(6); /// /// let w = Worker::new_fifo(); /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); /// assert_eq!(w.pop(), Some(2)); /// assert_eq!(w.pop(), None); /// /// q.push(7); /// // Setting a large limit does not guarantee that all elements will be popped. In this case, /// // half of the elements are currently popped, but the number of popped elements is considered /// // an implementation detail that may be changed in the future. /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); /// assert_eq!(w.pop(), Some(4)); /// assert_eq!(w.pop(), Some(5)); /// assert_eq!(w.pop(), None); /// ``` pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker, limit: usize) -> Steal { assert!(limit > 0); let mut head; let mut block; let mut offset; let backoff = Backoff::new(); loop { head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. offset = (head >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); } else { break; } } let mut new_head = head; let advance; if new_head & HAS_NEXT == 0 { atomic::fence(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::Relaxed); // If the tail equals the head, that means the queue is empty. if head >> SHIFT == tail >> SHIFT { return Steal::Empty; } // If head and tail are not in the same block, set `HAS_NEXT` in head. if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT; let new_offset = offset + advance; // Try moving the head index forward. if self .head .index .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) .is_err() { return Steal::Retry; } // Reserve capacity for the stolen batch. let batch_size = new_offset - offset - 1; dest.reserve(batch_size); // Get the destination buffer and back index. let dest_buffer = dest.buffer.get(); let dest_b = dest.inner.back.load(Ordering::Relaxed); unsafe { // If we've reached the end of the block, move to the next one. if new_offset == BLOCK_CAP { let next = (*block).wait_next(); let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= HAS_NEXT; } self.head.block.store(next, Ordering::Release); self.head.index.store(next_index, Ordering::Release); } // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); let task = slot.task.get().read(); match dest.flavor { Flavor::Fifo => { // Copy values from the injector into the destination queue. for i in 0..batch_size { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); } } Flavor::Lifo => { // Copy values from the injector into the destination queue. for i in 0..batch_size { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); } } } atomic::fence(Ordering::Release); // Update the back index in the destination queue. // // This ordering could be `Relaxed`, but then thread sanitizer would falsely report // data races because it doesn't understand fences. dest.inner .back .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. if new_offset == BLOCK_CAP { Block::destroy(block, offset); } else { for i in offset..new_offset { let slot = (*block).slots.get_unchecked(i); if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { Block::destroy(block, offset); break; } } } Steal::Success(task.assume_init()) } } /// Returns `true` if the queue is empty. /// /// # Examples /// /// ``` /// use crossbeam_deque::Injector; /// /// let q = Injector::new(); /// /// assert!(q.is_empty()); /// q.push(1); /// assert!(!q.is_empty()); /// ``` pub fn is_empty(&self) -> bool { let head = self.head.index.load(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::SeqCst); head >> SHIFT == tail >> SHIFT } /// Returns the number of tasks in the queue. /// /// # Examples /// /// ``` /// use crossbeam_deque::Injector; /// /// let q = Injector::new(); /// /// assert_eq!(q.len(), 0); /// q.push(1); /// assert_eq!(q.len(), 1); /// q.push(1); /// assert_eq!(q.len(), 2); /// ``` pub fn len(&self) -> usize { loop { // Load the tail index, then load the head index. let mut tail = self.tail.index.load(Ordering::SeqCst); let mut head = self.head.index.load(Ordering::SeqCst); // If the tail index didn't change, we've got consistent indices to work with. if self.tail.index.load(Ordering::SeqCst) == tail { // Erase the lower bits. tail &= !((1 << SHIFT) - 1); head &= !((1 << SHIFT) - 1); // Fix up indices if they fall onto block ends. if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { tail = tail.wrapping_add(1 << SHIFT); } if (head >> SHIFT) & (LAP - 1) == LAP - 1 { head = head.wrapping_add(1 << SHIFT); } // Rotate indices so that head falls into the first block. let lap = (head >> SHIFT) / LAP; tail = tail.wrapping_sub((lap * LAP) << SHIFT); head = head.wrapping_sub((lap * LAP) << SHIFT); // Remove the lower bits. tail >>= SHIFT; head >>= SHIFT; // Return the difference minus the number of blocks between tail and head. return tail - head - tail / LAP; } } } } impl Drop for Injector { fn drop(&mut self) { let mut head = *self.head.index.get_mut(); let mut tail = *self.tail.index.get_mut(); let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); tail &= !((1 << SHIFT) - 1); unsafe { // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. while head != tail { let offset = (head >> SHIFT) % LAP; if offset < BLOCK_CAP { // Drop the task in the slot. let slot = (*block).slots.get_unchecked(offset); let p = &mut *slot.task.get(); p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } head = head.wrapping_add(1 << SHIFT); } // Deallocate the last remaining block. drop(Box::from_raw(block)); } } } impl fmt::Debug for Injector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Worker { .. }") } } /// Possible outcomes of a steal operation. /// /// # Examples /// /// There are lots of ways to chain results of steal operations together: /// /// ``` /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; /// /// let collect = |v: Vec>| v.into_iter().collect::>(); /// /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); /// /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); /// ``` #[must_use] #[derive(PartialEq, Eq, Copy, Clone)] pub enum Steal { /// The queue was empty at the time of stealing. Empty, /// At least one task was successfully stolen. Success(T), /// The steal operation needs to be retried. Retry, } impl Steal { /// Returns `true` if the queue was empty at the time of stealing. /// /// # Examples /// /// ``` /// use crossbeam_deque::Steal::{Empty, Retry, Success}; /// /// assert!(!Success(7).is_empty()); /// assert!(!Retry::.is_empty()); /// /// assert!(Empty::.is_empty()); /// ``` pub fn is_empty(&self) -> bool { match self { Steal::Empty => true, _ => false, } } /// Returns `true` if at least one task was stolen. /// /// # Examples /// /// ``` /// use crossbeam_deque::Steal::{Empty, Retry, Success}; /// /// assert!(!Empty::.is_success()); /// assert!(!Retry::.is_success()); /// /// assert!(Success(7).is_success()); /// ``` pub fn is_success(&self) -> bool { match self { Steal::Success(_) => true, _ => false, } } /// Returns `true` if the steal operation needs to be retried. /// /// # Examples /// /// ``` /// use crossbeam_deque::Steal::{Empty, Retry, Success}; /// /// assert!(!Empty::.is_retry()); /// assert!(!Success(7).is_retry()); /// /// assert!(Retry::.is_retry()); /// ``` pub fn is_retry(&self) -> bool { match self { Steal::Retry => true, _ => false, } } /// Returns the result of the operation, if successful. /// /// # Examples /// /// ``` /// use crossbeam_deque::Steal::{Empty, Retry, Success}; /// /// assert_eq!(Empty::.success(), None); /// assert_eq!(Retry::.success(), None); /// /// assert_eq!(Success(7).success(), Some(7)); /// ``` pub fn success(self) -> Option { match self { Steal::Success(res) => Some(res), _ => None, } } /// If no task was stolen, attempts another steal operation. /// /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: /// /// * If the second steal resulted in `Success`, it is returned. /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. /// * If both resulted in `None`, then `None` is returned. /// /// # Examples /// /// ``` /// use crossbeam_deque::Steal::{Empty, Retry, Success}; /// /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); /// /// assert_eq!(Retry.or_else(|| Empty), Retry::); /// assert_eq!(Empty.or_else(|| Retry), Retry::); /// /// assert_eq!(Empty.or_else(|| Empty), Empty::); /// ``` pub fn or_else(self, f: F) -> Steal where F: FnOnce() -> Steal, { match self { Steal::Empty => f(), Steal::Success(_) => self, Steal::Retry => { if let Steal::Success(res) = f() { Steal::Success(res) } else { Steal::Retry } } } } } impl fmt::Debug for Steal { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Steal::Empty => f.pad("Empty"), Steal::Success(_) => f.pad("Success(..)"), Steal::Retry => f.pad("Retry"), } } } impl FromIterator> for Steal { /// Consumes items until a `Success` is found and returns it. /// /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. /// Otherwise, `Empty` is returned. fn from_iter(iter: I) -> Steal where I: IntoIterator>, { let mut retry = false; for s in iter { match &s { Steal::Empty => {} Steal::Success(_) => return s, Steal::Retry => retry = true, } } if retry { Steal::Retry } else { Steal::Empty } } }