diff options
Diffstat (limited to 'vendor/crossbeam-deque')
-rw-r--r-- | vendor/crossbeam-deque/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | vendor/crossbeam-deque/CHANGELOG.md | 133 | ||||
-rw-r--r-- | vendor/crossbeam-deque/Cargo.toml | 55 | ||||
-rw-r--r-- | vendor/crossbeam-deque/LICENSE-APACHE | 201 | ||||
-rw-r--r-- | vendor/crossbeam-deque/LICENSE-MIT | 27 | ||||
-rw-r--r-- | vendor/crossbeam-deque/README.md | 46 | ||||
-rw-r--r-- | vendor/crossbeam-deque/src/deque.rs | 2195 | ||||
-rw-r--r-- | vendor/crossbeam-deque/src/lib.rs | 110 | ||||
-rw-r--r-- | vendor/crossbeam-deque/tests/fifo.rs | 357 | ||||
-rw-r--r-- | vendor/crossbeam-deque/tests/injector.rs | 375 | ||||
-rw-r--r-- | vendor/crossbeam-deque/tests/lifo.rs | 359 | ||||
-rw-r--r-- | vendor/crossbeam-deque/tests/steal.rs | 212 |
12 files changed, 0 insertions, 4071 deletions
diff --git a/vendor/crossbeam-deque/.cargo-checksum.json b/vendor/crossbeam-deque/.cargo-checksum.json deleted file mode 100644 index 9c26d82..0000000 --- a/vendor/crossbeam-deque/.cargo-checksum.json +++ /dev/null @@ -1 +0,0 @@ -{"files":{"CHANGELOG.md":"23c4df815148a8d0cd366661313c629c115a022b3609d899665a1a079939ca46","Cargo.toml":"6f08856d95a287f5852ea0b333c2b44c0bfdd20a1430cae8632aaf6164a661a5","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"5734ed989dfca1f625b40281ee9f4530f91b2411ec01cb748223e7eb87e201ab","README.md":"86445da156ad68ea1d1f2dc49a3cef942ccc377ff56316aefe89732ded763aba","src/deque.rs":"34c093fdd7df55d7838f023d00e4fff7d8cffb9c04b6bbe167c7bc7b8fee57a0","src/lib.rs":"9f0581481691bc698176f369410726adf597d470b9d14e226a65f490d6aff8c6","tests/fifo.rs":"3d98e0d4ca7cfddf10708b71642cf1ff05543d067ad837e48401d63cc31c0a18","tests/injector.rs":"fb054ef9fcac5f12e08b7b3451f370b96ab7589d32ef5c02e25958a473c45519","tests/lifo.rs":"57abdb3fc5920a422f785ba308b658bdc5400947532eeffb799f2395a2061549","tests/steal.rs":"cdf588cc13eeb275ef1231eb18e3245faca7a2d054fa6527bfdba2a34bc8f7bf"},"package":"fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751"}
\ No newline at end of file diff --git a/vendor/crossbeam-deque/CHANGELOG.md b/vendor/crossbeam-deque/CHANGELOG.md deleted file mode 100644 index f763754..0000000 --- a/vendor/crossbeam-deque/CHANGELOG.md +++ /dev/null @@ -1,133 +0,0 @@ -# Version 0.8.4 - -- Bump the minimum supported Rust version to 1.61. (#1037) - -# Version 0.8.3 - -- Add `Stealer::{steal_batch_with_limit, steal_batch_with_limit_and_pop}` methods. (#903) -- Add `Injector::{steal_batch_with_limit, steal_batch_with_limit_and_pop}` methods. (#903) - -# Version 0.8.2 - -- Bump the minimum supported Rust version to 1.38. (#877) - -# Version 0.8.1 - -- Fix deque steal race condition. (#726) -- Add `Stealer::len` method. (#708) - -# Version 0.8.0 - -**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. - -- Bump the minimum supported Rust version to 1.36. -- Add `Worker::len()` and `Injector::len()` methods. -- Add `std` (enabled by default) feature for forward compatibility. - -# Version 0.7.4 - -- Fix deque steal race condition. - -# Version 0.7.3 - -**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. - -- Stop stealing from the same deque. (#448) -- Fix unsoundness issues by adopting `MaybeUninit`. (#458) - -# Version 0.7.2 - -**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. - -- Bump `crossbeam-epoch` to `0.8`. -- Bump `crossbeam-utils` to `0.7`. - -# Version 0.7.1 - -**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. - -- Bump the minimum required version of `crossbeam-utils`. - -# Version 0.7.0 - -**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. - -- Make `Worker::pop()` faster in the FIFO case. -- Replace `fifo()` nad `lifo()` with `Worker::new_fifo()` and `Worker::new_lifo()`. -- Add more batched steal methods. -- Introduce `Injector<T>`, a MPMC queue. -- Rename `Steal::Data` to `Steal::Success`. -- Add `Steal::or_else()` and implement `FromIterator` for `Steal`. -- Add `#[must_use]` to `Steal`. - -# Version 0.6.3 - -- Bump `crossbeam-epoch` to `0.7`. - -# Version 0.6.2 - -- Update `crosbeam-utils` to `0.6`. - -# Version 0.6.1 - -- Change a few `Relaxed` orderings to `Release` in order to fix false positives by tsan. - -# Version 0.6.0 - -- Add `Stealer::steal_many` for batched stealing. -- Change the return type of `pop` to `Pop<T>` so that spinning can be handled manually. - -# Version 0.5.2 - -- Update `crossbeam-utils` to `0.5.0`. - -# Version 0.5.1 - -- Minor optimizations. - -# Version 0.5.0 - -- Add two deque constructors : `fifo()` and `lifo()`. -- Update `rand` to `0.5.3`. -- Rename `Deque` to `Worker`. -- Return `Option<T>` from `Stealer::steal`. -- Remove methods `Deque::len` and `Stealer::len`. -- Remove method `Deque::stealer`. -- Remove method `Deque::steal`. - -# Version 0.4.1 - -- Update `crossbeam-epoch` to `0.5.0`. - -# Version 0.4.0 - -- Update `crossbeam-epoch` to `0.4.2`. -- Update `crossbeam-utils` to `0.4.0`. -- Require minimum Rust version 1.25. - -# Version 0.3.1 - -- Add `Deque::capacity`. -- Add `Deque::min_capacity`. -- Add `Deque::shrink_to_fit`. -- Update `crossbeam-epoch` to `0.3.0`. -- Support Rust 1.20. -- Shrink the buffer in `Deque::push` if necessary. - -# Version 0.3.0 - -- Update `crossbeam-epoch` to `0.4.0`. -- Drop support for Rust 1.13. - -# Version 0.2.0 - -- Update `crossbeam-epoch` to `0.3.0`. -- Support Rust 1.13. - -# Version 0.1.1 - -- Update `crossbeam-epoch` to `0.2.0`. - -# Version 0.1.0 - -- First implementation of the Chase-Lev deque. diff --git a/vendor/crossbeam-deque/Cargo.toml b/vendor/crossbeam-deque/Cargo.toml deleted file mode 100644 index e04086e..0000000 --- a/vendor/crossbeam-deque/Cargo.toml +++ /dev/null @@ -1,55 +0,0 @@ -# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO -# -# When uploading crates to the registry Cargo will automatically -# "normalize" Cargo.toml files for maximal compatibility -# with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies. -# -# If you are reading this file be aware that the original Cargo.toml -# will likely look very different (and much more reasonable). -# See Cargo.toml.orig for the original contents. - -[package] -edition = "2018" -rust-version = "1.61" -name = "crossbeam-deque" -version = "0.8.4" -description = "Concurrent work-stealing deque" -homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" -readme = "README.md" -keywords = [ - "chase-lev", - "lock-free", - "scheduler", - "scheduling", -] -categories = [ - "algorithms", - "concurrency", - "data-structures", -] -license = "MIT OR Apache-2.0" -repository = "https://github.com/crossbeam-rs/crossbeam" - -[dependencies.cfg-if] -version = "1" - -[dependencies.crossbeam-epoch] -version = "0.9.16" -optional = true -default-features = false - -[dependencies.crossbeam-utils] -version = "0.8.17" -optional = true -default-features = false - -[dev-dependencies.rand] -version = "0.8" - -[features] -default = ["std"] -std = [ - "crossbeam-epoch/std", - "crossbeam-utils/std", -] diff --git a/vendor/crossbeam-deque/LICENSE-APACHE b/vendor/crossbeam-deque/LICENSE-APACHE deleted file mode 100644 index 16fe87b..0000000 --- a/vendor/crossbeam-deque/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/vendor/crossbeam-deque/LICENSE-MIT b/vendor/crossbeam-deque/LICENSE-MIT deleted file mode 100644 index 068d491..0000000 --- a/vendor/crossbeam-deque/LICENSE-MIT +++ /dev/null @@ -1,27 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2019 The Crossbeam Project Developers - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/vendor/crossbeam-deque/README.md b/vendor/crossbeam-deque/README.md deleted file mode 100644 index 4eae144..0000000 --- a/vendor/crossbeam-deque/README.md +++ /dev/null @@ -1,46 +0,0 @@ -# Crossbeam Deque - -[![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( -https://github.com/crossbeam-rs/crossbeam/actions) -[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( -https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque#license) -[![Cargo](https://img.shields.io/crates/v/crossbeam-deque.svg)]( -https://crates.io/crates/crossbeam-deque) -[![Documentation](https://docs.rs/crossbeam-deque/badge.svg)]( -https://docs.rs/crossbeam-deque) -[![Rust 1.61+](https://img.shields.io/badge/rust-1.61+-lightgray.svg)]( -https://www.rust-lang.org) -[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) - -This crate provides work-stealing deques, which are primarily intended for -building task schedulers. - -## Usage - -Add this to your `Cargo.toml`: - -```toml -[dependencies] -crossbeam-deque = "0.8" -``` - -## Compatibility - -Crossbeam Deque supports stable Rust releases going back at least six months, -and every time the minimum supported Rust version is increased, a new minor -version is released. Currently, the minimum supported Rust version is 1.61. - -## License - -Licensed under either of - - * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) - -at your option. - -#### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in the work by you, as defined in the Apache-2.0 license, shall be -dual licensed as above, without any additional terms or conditions. diff --git a/vendor/crossbeam-deque/src/deque.rs b/vendor/crossbeam-deque/src/deque.rs deleted file mode 100644 index c37de2d..0000000 --- a/vendor/crossbeam-deque/src/deque.rs +++ /dev/null @@ -1,2195 +0,0 @@ -use std::cell::{Cell, UnsafeCell}; -use std::cmp; -use std::fmt; -use std::iter::FromIterator; -use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop, MaybeUninit}; -use std::ptr; -use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; -use std::sync::Arc; - -use crate::epoch::{self, Atomic, Owned}; -use crate::utils::{Backoff, CachePadded}; - -// Minimum buffer capacity. -const MIN_CAP: usize = 64; -// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. -const MAX_BATCH: usize = 32; -// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets -// deallocated as soon as possible. -const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; - -/// A buffer that holds tasks in a worker queue. -/// -/// This is just a pointer to the buffer and its length - dropping an instance of this struct will -/// *not* deallocate the buffer. -struct Buffer<T> { - /// Pointer to the allocated memory. - ptr: *mut T, - - /// Capacity of the buffer. Always a power of two. - cap: usize, -} - -unsafe impl<T> Send for Buffer<T> {} - -impl<T> Buffer<T> { - /// Allocates a new buffer with the specified capacity. - fn alloc(cap: usize) -> Buffer<T> { - debug_assert_eq!(cap, cap.next_power_of_two()); - - let mut v = ManuallyDrop::new(Vec::with_capacity(cap)); - let ptr = v.as_mut_ptr(); - - Buffer { ptr, cap } - } - - /// Deallocates the buffer. - unsafe fn dealloc(self) { - drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); - } - - /// Returns a pointer to the task at the specified `index`. - unsafe fn at(&self, index: isize) -> *mut T { - // `self.cap` is always a power of two. - // We do all the loads at `MaybeUninit` because we might realize, after loading, that we - // don't actually have the right to access this memory. - self.ptr.offset(index & (self.cap - 1) as isize) - } - - /// Writes `task` into the specified `index`. - /// - /// This method might be concurrently called with another `read` at the same index, which is - /// technically speaking a data race and therefore UB. We should use an atomic store here, but - /// that would be more expensive and difficult to implement generically for all types `T`. - /// Hence, as a hack, we use a volatile write instead. - unsafe fn write(&self, index: isize, task: MaybeUninit<T>) { - ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) - } - - /// Reads a task from the specified `index`. - /// - /// This method might be concurrently called with another `write` at the same index, which is - /// technically speaking a data race and therefore UB. We should use an atomic load here, but - /// that would be more expensive and difficult to implement generically for all types `T`. - /// Hence, as a hack, we use a volatile load instead. - unsafe fn read(&self, index: isize) -> MaybeUninit<T> { - ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) - } -} - -impl<T> Clone for Buffer<T> { - fn clone(&self) -> Buffer<T> { - *self - } -} - -impl<T> Copy for Buffer<T> {} - -/// Internal queue data shared between the worker and stealers. -/// -/// The implementation is based on the following work: -/// -/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] -/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. -/// PPoPP 2013.][weak-mem] -/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ -/// atomics. OOPSLA 2013.][checker] -/// -/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 -/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 -/// [checker]: https://dl.acm.org/citation.cfm?id=2509514 -struct Inner<T> { - /// The front index. - front: AtomicIsize, - - /// The back index. - back: AtomicIsize, - - /// The underlying buffer. - buffer: CachePadded<Atomic<Buffer<T>>>, -} - -impl<T> Drop for Inner<T> { - fn drop(&mut self) { - // Load the back index, front index, and buffer. - let b = *self.back.get_mut(); - let f = *self.front.get_mut(); - - unsafe { - let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); - - // Go through the buffer from front to back and drop all tasks in the queue. - let mut i = f; - while i != b { - buffer.deref().at(i).drop_in_place(); - i = i.wrapping_add(1); - } - - // Free the memory allocated by the buffer. - buffer.into_owned().into_box().dealloc(); - } - } -} - -/// Worker queue flavor: FIFO or LIFO. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum Flavor { - /// The first-in first-out flavor. - Fifo, - - /// The last-in first-out flavor. - Lifo, -} - -/// A worker queue. -/// -/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal -/// tasks from it. Task schedulers typically create a single worker queue per thread. -/// -/// # Examples -/// -/// A FIFO worker: -/// -/// ``` -/// use crossbeam_deque::{Steal, Worker}; -/// -/// let w = Worker::new_fifo(); -/// let s = w.stealer(); -/// -/// w.push(1); -/// w.push(2); -/// w.push(3); -/// -/// assert_eq!(s.steal(), Steal::Success(1)); -/// assert_eq!(w.pop(), Some(2)); -/// assert_eq!(w.pop(), Some(3)); -/// ``` -/// -/// A LIFO worker: -/// -/// ``` -/// use crossbeam_deque::{Steal, Worker}; -/// -/// let w = Worker::new_lifo(); -/// let s = w.stealer(); -/// -/// w.push(1); -/// w.push(2); -/// w.push(3); -/// -/// assert_eq!(s.steal(), Steal::Success(1)); -/// assert_eq!(w.pop(), Some(3)); -/// assert_eq!(w.pop(), Some(2)); -/// ``` -pub struct Worker<T> { - /// A reference to the inner representation of the queue. - inner: Arc<CachePadded<Inner<T>>>, - - /// A copy of `inner.buffer` for quick access. - buffer: Cell<Buffer<T>>, - - /// The flavor of the queue. - flavor: Flavor, - - /// Indicates that the worker cannot be shared among threads. - _marker: PhantomData<*mut ()>, // !Send + !Sync -} - -unsafe impl<T: Send> Send for Worker<T> {} - -impl<T> Worker<T> { - /// Creates a FIFO worker queue. - /// - /// Tasks are pushed and popped from opposite ends. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::<i32>::new_fifo(); - /// ``` - pub fn new_fifo() -> Worker<T> { - let buffer = Buffer::alloc(MIN_CAP); - - let inner = Arc::new(CachePadded::new(Inner { - front: AtomicIsize::new(0), - back: AtomicIsize::new(0), - buffer: CachePadded::new(Atomic::new(buffer)), - })); - - Worker { - inner, - buffer: Cell::new(buffer), - flavor: Flavor::Fifo, - _marker: PhantomData, - } - } - - /// Creates a LIFO worker queue. - /// - /// Tasks are pushed and popped from the same end. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::<i32>::new_lifo(); - /// ``` - pub fn new_lifo() -> Worker<T> { - let buffer = Buffer::alloc(MIN_CAP); - - let inner = Arc::new(CachePadded::new(Inner { - front: AtomicIsize::new(0), - back: AtomicIsize::new(0), - buffer: CachePadded::new(Atomic::new(buffer)), - })); - - Worker { - inner, - buffer: Cell::new(buffer), - flavor: Flavor::Lifo, - _marker: PhantomData, - } - } - - /// Creates a stealer for this queue. - /// - /// The returned stealer can be shared among threads and cloned. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::<i32>::new_lifo(); - /// let s = w.stealer(); - /// ``` - pub fn stealer(&self) -> Stealer<T> { - Stealer { - inner: self.inner.clone(), - flavor: self.flavor, - } - } - - /// Resizes the internal buffer to the new capacity of `new_cap`. - #[cold] - unsafe fn resize(&self, new_cap: usize) { - // Load the back index, front index, and buffer. - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::Relaxed); - let buffer = self.buffer.get(); - - // Allocate a new buffer and copy data from the old buffer to the new one. - let new = Buffer::alloc(new_cap); - let mut i = f; - while i != b { - ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); - i = i.wrapping_add(1); - } - - let guard = &epoch::pin(); - - // Replace the old buffer with the new one. - self.buffer.replace(new); - let old = - self.inner - .buffer - .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); - - // Destroy the old buffer later. - guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); - - // If the buffer is very large, then flush the thread-local garbage in order to deallocate - // it as soon as possible. - if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { - guard.flush(); - } - } - - /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the - /// buffer. - fn reserve(&self, reserve_cap: usize) { - if reserve_cap > 0 { - // Compute the current length. - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::SeqCst); - let len = b.wrapping_sub(f) as usize; - - // The current capacity. - let cap = self.buffer.get().cap; - - // Is there enough capacity to push `reserve_cap` tasks? - if cap - len < reserve_cap { - // Keep doubling the capacity as much as is needed. - let mut new_cap = cap * 2; - while new_cap - len < reserve_cap { - new_cap *= 2; - } - - // Resize the buffer. - unsafe { - self.resize(new_cap); - } - } - } - } - - /// Returns `true` if the queue is empty. - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_lifo(); - /// - /// assert!(w.is_empty()); - /// w.push(1); - /// assert!(!w.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::SeqCst); - b.wrapping_sub(f) <= 0 - } - - /// Returns the number of tasks in the deque. - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_lifo(); - /// - /// assert_eq!(w.len(), 0); - /// w.push(1); - /// assert_eq!(w.len(), 1); - /// w.push(1); - /// assert_eq!(w.len(), 2); - /// ``` - pub fn len(&self) -> usize { - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::SeqCst); - b.wrapping_sub(f).max(0) as usize - } - - /// Pushes a task into the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_lifo(); - /// w.push(1); - /// w.push(2); - /// ``` - pub fn push(&self, task: T) { - // Load the back index, front index, and buffer. - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::Acquire); - let mut buffer = self.buffer.get(); - - // Calculate the length of the queue. - let len = b.wrapping_sub(f); - - // Is the queue full? - if len >= buffer.cap as isize { - // Yes. Grow the underlying buffer. - unsafe { - self.resize(2 * buffer.cap); - } - buffer = self.buffer.get(); - } - - // Write `task` into the slot. - unsafe { - buffer.write(b, MaybeUninit::new(task)); - } - - atomic::fence(Ordering::Release); - - // Increment the back index. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data - // races because it doesn't understand fences. - self.inner.back.store(b.wrapping_add(1), Ordering::Release); - } - - /// Pops a task from the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_fifo(); - /// w.push(1); - /// w.push(2); - /// - /// assert_eq!(w.pop(), Some(1)); - /// assert_eq!(w.pop(), Some(2)); - /// assert_eq!(w.pop(), None); - /// ``` - pub fn pop(&self) -> Option<T> { - // Load the back and front index. - let b = self.inner.back.load(Ordering::Relaxed); - let f = self.inner.front.load(Ordering::Relaxed); - - // Calculate the length of the queue. - let len = b.wrapping_sub(f); - - // Is the queue empty? - if len <= 0 { - return None; - } - - match self.flavor { - // Pop from the front of the queue. - Flavor::Fifo => { - // Try incrementing the front index to pop the task. - let f = self.inner.front.fetch_add(1, Ordering::SeqCst); - let new_f = f.wrapping_add(1); - - if b.wrapping_sub(new_f) < 0 { - self.inner.front.store(f, Ordering::Relaxed); - return None; - } - - unsafe { - // Read the popped task. - let buffer = self.buffer.get(); - let task = buffer.read(f).assume_init(); - - // Shrink the buffer if `len - 1` is less than one fourth of the capacity. - if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { - self.resize(buffer.cap / 2); - } - - Some(task) - } - } - - // Pop from the back of the queue. - Flavor::Lifo => { - // Decrement the back index. - let b = b.wrapping_sub(1); - self.inner.back.store(b, Ordering::Relaxed); - - atomic::fence(Ordering::SeqCst); - - // Load the front index. - let f = self.inner.front.load(Ordering::Relaxed); - - // Compute the length after the back index was decremented. - let len = b.wrapping_sub(f); - - if len < 0 { - // The queue is empty. Restore the back index to the original task. - self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); - None - } else { - // Read the task to be popped. - let buffer = self.buffer.get(); - let mut task = unsafe { Some(buffer.read(b)) }; - - // Are we popping the last task from the queue? - if len == 0 { - // Try incrementing the front index. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() - { - // Failed. We didn't pop anything. Reset to `None`. - task.take(); - } - - // Restore the back index to the original task. - self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); - } else { - // Shrink the buffer if `len` is less than one fourth of the capacity. - if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { - unsafe { - self.resize(buffer.cap / 2); - } - } - } - - task.map(|t| unsafe { t.assume_init() }) - } - } - } - } -} - -impl<T> fmt::Debug for Worker<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Worker { .. }") - } -} - -/// A stealer handle of a worker queue. -/// -/// Stealers can be shared among threads. -/// -/// Task schedulers typically have a single worker queue per worker thread. -/// -/// # Examples -/// -/// ``` -/// use crossbeam_deque::{Steal, Worker}; -/// -/// let w = Worker::new_lifo(); -/// w.push(1); -/// w.push(2); -/// -/// let s = w.stealer(); -/// assert_eq!(s.steal(), Steal::Success(1)); -/// assert_eq!(s.steal(), Steal::Success(2)); -/// assert_eq!(s.steal(), Steal::Empty); -/// ``` -pub struct Stealer<T> { - /// A reference to the inner representation of the queue. - inner: Arc<CachePadded<Inner<T>>>, - - /// The flavor of the queue. - flavor: Flavor, -} - -unsafe impl<T: Send> Send for Stealer<T> {} -unsafe impl<T: Send> Sync for Stealer<T> {} - -impl<T> Stealer<T> { - /// Returns `true` if the queue is empty. - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_lifo(); - /// let s = w.stealer(); - /// - /// assert!(s.is_empty()); - /// w.push(1); - /// assert!(!s.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - let f = self.inner.front.load(Ordering::Acquire); - atomic::fence(Ordering::SeqCst); - let b = self.inner.back.load(Ordering::Acquire); - b.wrapping_sub(f) <= 0 - } - - /// Returns the number of tasks in the deque. - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w = Worker::new_lifo(); - /// let s = w.stealer(); - /// - /// assert_eq!(s.len(), 0); - /// w.push(1); - /// assert_eq!(s.len(), 1); - /// w.push(2); - /// assert_eq!(s.len(), 2); - /// ``` - pub fn len(&self) -> usize { - let f = self.inner.front.load(Ordering::Acquire); - atomic::fence(Ordering::SeqCst); - let b = self.inner.back.load(Ordering::Acquire); - b.wrapping_sub(f).max(0) as usize - } - - /// Steals a task from the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Steal, Worker}; - /// - /// let w = Worker::new_lifo(); - /// w.push(1); - /// w.push(2); - /// - /// let s = w.stealer(); - /// assert_eq!(s.steal(), Steal::Success(1)); - /// assert_eq!(s.steal(), Steal::Success(2)); - /// ``` - pub fn steal(&self) -> Steal<T> { - // Load the front index. - let f = self.inner.front.load(Ordering::Acquire); - - // A SeqCst fence is needed here. - // - // If the current thread is already pinned (reentrantly), we must manually issue the - // fence. Otherwise, the following pinning will issue the fence anyway, so we don't - // have to. - if epoch::is_pinned() { - atomic::fence(Ordering::SeqCst); - } - - let guard = &epoch::pin(); - - // Load the back index. - let b = self.inner.back.load(Ordering::Acquire); - - // Is the queue empty? - if b.wrapping_sub(f) <= 0 { - return Steal::Empty; - } - - // Load the buffer and read the task at the front. - let buffer = self.inner.buffer.load(Ordering::Acquire, guard); - let task = unsafe { buffer.deref().read(f) }; - - // Try incrementing the front index to steal the task. - // If the buffer has been swapped or the increment fails, we retry. - if self.inner.buffer.load(Ordering::Acquire, guard) != buffer - || self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { - // We didn't steal this task, forget it. - return Steal::Retry; - } - - // Return the stolen task. - Steal::Success(unsafe { task.assume_init() }) - } - - /// Steals a batch of tasks and pushes them into another worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than some constant limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w1 = Worker::new_fifo(); - /// w1.push(1); - /// w1.push(2); - /// w1.push(3); - /// w1.push(4); - /// - /// let s = w1.stealer(); - /// let w2 = Worker::new_fifo(); - /// - /// let _ = s.steal_batch(&w2); - /// assert_eq!(w2.pop(), Some(1)); - /// assert_eq!(w2.pop(), Some(2)); - /// ``` - pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { - self.steal_batch_with_limit(dest, MAX_BATCH) - } - - /// Steals no more than `limit` of tasks and pushes them into another worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than the given limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Worker; - /// - /// let w1 = Worker::new_fifo(); - /// w1.push(1); - /// w1.push(2); - /// w1.push(3); - /// w1.push(4); - /// w1.push(5); - /// w1.push(6); - /// - /// let s = w1.stealer(); - /// let w2 = Worker::new_fifo(); - /// - /// let _ = s.steal_batch_with_limit(&w2, 2); - /// assert_eq!(w2.pop(), Some(1)); - /// assert_eq!(w2.pop(), Some(2)); - /// assert_eq!(w2.pop(), None); - /// - /// w1.push(7); - /// w1.push(8); - /// // Setting a large limit does not guarantee that all elements will be popped. In this case, - /// // half of the elements are currently popped, but the number of popped elements is considered - /// // an implementation detail that may be changed in the future. - /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); - /// assert_eq!(w2.len(), 3); - /// ``` - pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { - assert!(limit > 0); - if Arc::ptr_eq(&self.inner, &dest.inner) { - if dest.is_empty() { - return Steal::Empty; - } else { - return Steal::Success(()); - } - } - - // Load the front index. - let mut f = self.inner.front.load(Ordering::Acquire); - - // A SeqCst fence is needed here. - // - // If the current thread is already pinned (reentrantly), we must manually issue the - // fence. Otherwise, the following pinning will issue the fence anyway, so we don't - // have to. - if epoch::is_pinned() { - atomic::fence(Ordering::SeqCst); - } - - let guard = &epoch::pin(); - - // Load the back index. - let b = self.inner.back.load(Ordering::Acquire); - - // Is the queue empty? - let len = b.wrapping_sub(f); - if len <= 0 { - return Steal::Empty; - } - - // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize + 1) / 2, limit); - dest.reserve(batch_size); - let mut batch_size = batch_size as isize; - - // Get the destination buffer and back index. - let dest_buffer = dest.buffer.get(); - let mut dest_b = dest.inner.back.load(Ordering::Relaxed); - - // Load the buffer. - let buffer = self.inner.buffer.load(Ordering::Acquire, guard); - - match self.flavor { - // Steal a batch of tasks from the front at once. - Flavor::Fifo => { - // Copy the batch from the source to the destination buffer. - match dest.flavor { - Flavor::Fifo => { - for i in 0..batch_size { - unsafe { - let task = buffer.deref().read(f.wrapping_add(i)); - dest_buffer.write(dest_b.wrapping_add(i), task); - } - } - } - Flavor::Lifo => { - for i in 0..batch_size { - unsafe { - let task = buffer.deref().read(f.wrapping_add(i)); - dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); - } - } - } - } - - // Try incrementing the front index to steal the batch. - // If the buffer has been swapped or the increment fails, we retry. - if self.inner.buffer.load(Ordering::Acquire, guard) != buffer - || self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() - { - return Steal::Retry; - } - - dest_b = dest_b.wrapping_add(batch_size); - } - - // Steal a batch of tasks from the front one by one. - Flavor::Lifo => { - // This loop may modify the batch_size, which triggers a clippy lint warning. - // Use a new variable to avoid the warning, and to make it clear we aren't - // modifying the loop exit condition during iteration. - let original_batch_size = batch_size; - - for i in 0..original_batch_size { - // If this is not the first steal, check whether the queue is empty. - if i > 0 { - // We've already got the current front index. Now execute the fence to - // synchronize with other threads. - atomic::fence(Ordering::SeqCst); - - // Load the back index. - let b = self.inner.back.load(Ordering::Acquire); - - // Is the queue empty? - if b.wrapping_sub(f) <= 0 { - batch_size = i; - break; - } - } - - // Read the task at the front. - let task = unsafe { buffer.deref().read(f) }; - - // Try incrementing the front index to steal the task. - // If the buffer has been swapped or the increment fails, we retry. - if self.inner.buffer.load(Ordering::Acquire, guard) != buffer - || self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() - { - // We didn't steal this task, forget it and break from the loop. - batch_size = i; - break; - } - - // Write the stolen task into the destination buffer. - unsafe { - dest_buffer.write(dest_b, task); - } - - // Move the source front index and the destination back index one step forward. - f = f.wrapping_add(1); - dest_b = dest_b.wrapping_add(1); - } - - // If we didn't steal anything, the operation needs to be retried. - if batch_size == 0 { - return Steal::Retry; - } - - // If stealing into a FIFO queue, stolen tasks need to be reversed. - if dest.flavor == Flavor::Fifo { - for i in 0..batch_size / 2 { - unsafe { - let i1 = dest_b.wrapping_sub(batch_size - i); - let i2 = dest_b.wrapping_sub(i + 1); - let t1 = dest_buffer.read(i1); - let t2 = dest_buffer.read(i2); - dest_buffer.write(i1, t2); - dest_buffer.write(i2, t1); - } - } - } - } - } - - atomic::fence(Ordering::Release); - - // Update the back index in the destination queue. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data - // races because it doesn't understand fences. - dest.inner.back.store(dest_b, Ordering::Release); - - // Return with success. - Steal::Success(()) - } - - /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than some constant limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Steal, Worker}; - /// - /// let w1 = Worker::new_fifo(); - /// w1.push(1); - /// w1.push(2); - /// w1.push(3); - /// w1.push(4); - /// - /// let s = w1.stealer(); - /// let w2 = Worker::new_fifo(); - /// - /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); - /// assert_eq!(w2.pop(), Some(2)); - /// ``` - pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { - self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) - } - - /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from - /// that worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than the given limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Steal, Worker}; - /// - /// let w1 = Worker::new_fifo(); - /// w1.push(1); - /// w1.push(2); - /// w1.push(3); - /// w1.push(4); - /// w1.push(5); - /// w1.push(6); - /// - /// let s = w1.stealer(); - /// let w2 = Worker::new_fifo(); - /// - /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); - /// assert_eq!(w2.pop(), Some(2)); - /// assert_eq!(w2.pop(), None); - /// - /// w1.push(7); - /// w1.push(8); - /// // Setting a large limit does not guarantee that all elements will be popped. In this case, - /// // half of the elements are currently popped, but the number of popped elements is considered - /// // an implementation detail that may be changed in the future. - /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); - /// assert_eq!(w2.pop(), Some(4)); - /// assert_eq!(w2.pop(), Some(5)); - /// assert_eq!(w2.pop(), None); - /// ``` - pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { - assert!(limit > 0); - if Arc::ptr_eq(&self.inner, &dest.inner) { - match dest.pop() { - None => return Steal::Empty, - Some(task) => return Steal::Success(task), - } - } - - // Load the front index. - let mut f = self.inner.front.load(Ordering::Acquire); - - // A SeqCst fence is needed here. - // - // If the current thread is already pinned (reentrantly), we must manually issue the - // fence. Otherwise, the following pinning will issue the fence anyway, so we don't - // have to. - if epoch::is_pinned() { - atomic::fence(Ordering::SeqCst); - } - - let guard = &epoch::pin(); - - // Load the back index. - let b = self.inner.back.load(Ordering::Acquire); - - // Is the queue empty? - let len = b.wrapping_sub(f); - if len <= 0 { - return Steal::Empty; - } - - // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); - dest.reserve(batch_size); - let mut batch_size = batch_size as isize; - - // Get the destination buffer and back index. - let dest_buffer = dest.buffer.get(); - let mut dest_b = dest.inner.back.load(Ordering::Relaxed); - - // Load the buffer - let buffer = self.inner.buffer.load(Ordering::Acquire, guard); - - // Read the task at the front. - let mut task = unsafe { buffer.deref().read(f) }; - - match self.flavor { - // Steal a batch of tasks from the front at once. - Flavor::Fifo => { - // Copy the batch from the source to the destination buffer. - match dest.flavor { - Flavor::Fifo => { - for i in 0..batch_size { - unsafe { - let task = buffer.deref().read(f.wrapping_add(i + 1)); - dest_buffer.write(dest_b.wrapping_add(i), task); - } - } - } - Flavor::Lifo => { - for i in 0..batch_size { - unsafe { - let task = buffer.deref().read(f.wrapping_add(i + 1)); - dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); - } - } - } - } - - // Try incrementing the front index to steal the task. - // If the buffer has been swapped or the increment fails, we retry. - if self.inner.buffer.load(Ordering::Acquire, guard) != buffer - || self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size + 1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() - { - // We didn't steal this task, forget it. - return Steal::Retry; - } - - dest_b = dest_b.wrapping_add(batch_size); - } - - // Steal a batch of tasks from the front one by one. - Flavor::Lifo => { - // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { - // We didn't steal this task, forget it. - return Steal::Retry; - } - - // Move the front index one step forward. - f = f.wrapping_add(1); - - // Repeat the same procedure for the batch steals. - // - // This loop may modify the batch_size, which triggers a clippy lint warning. - // Use a new variable to avoid the warning, and to make it clear we aren't - // modifying the loop exit condition during iteration. - let original_batch_size = batch_size; - for i in 0..original_batch_size { - // We've already got the current front index. Now execute the fence to - // synchronize with other threads. - atomic::fence(Ordering::SeqCst); - - // Load the back index. - let b = self.inner.back.load(Ordering::Acquire); - - // Is the queue empty? - if b.wrapping_sub(f) <= 0 { - batch_size = i; - break; - } - - // Read the task at the front. - let tmp = unsafe { buffer.deref().read(f) }; - - // Try incrementing the front index to steal the task. - // If the buffer has been swapped or the increment fails, we retry. - if self.inner.buffer.load(Ordering::Acquire, guard) != buffer - || self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() - { - // We didn't steal this task, forget it and break from the loop. - batch_size = i; - break; - } - - // Write the previously stolen task into the destination buffer. - unsafe { - dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); - } - - // Move the source front index and the destination back index one step forward. - f = f.wrapping_add(1); - dest_b = dest_b.wrapping_add(1); - } - - // If stealing into a FIFO queue, stolen tasks need to be reversed. - if dest.flavor == Flavor::Fifo { - for i in 0..batch_size / 2 { - unsafe { - let i1 = dest_b.wrapping_sub(batch_size - i); - let i2 = dest_b.wrapping_sub(i + 1); - let t1 = dest_buffer.read(i1); - let t2 = dest_buffer.read(i2); - dest_buffer.write(i1, t2); - dest_buffer.write(i2, t1); - } - } - } - } - } - - atomic::fence(Ordering::Release); - - // Update the back index in the destination queue. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data - // races because it doesn't understand fences. - dest.inner.back.store(dest_b, Ordering::Release); - - // Return with success. - Steal::Success(unsafe { task.assume_init() }) - } -} - -impl<T> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { - Stealer { - inner: self.inner.clone(), - flavor: self.flavor, - } - } -} - -impl<T> fmt::Debug for Stealer<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Stealer { .. }") - } -} - -// Bits indicating the state of a slot: -// * If a task has been written into the slot, `WRITE` is set. -// * If a task has been read from the slot, `READ` is set. -// * If the block is being destroyed, `DESTROY` is set. -const WRITE: usize = 1; -const READ: usize = 2; -const DESTROY: usize = 4; - -// Each block covers one "lap" of indices. -const LAP: usize = 64; -// The maximum number of values a block can hold. -const BLOCK_CAP: usize = LAP - 1; -// How many lower bits are reserved for metadata. -const SHIFT: usize = 1; -// Indicates that the block is not the last one. -const HAS_NEXT: usize = 1; - -/// A slot in a block. -struct Slot<T> { - /// The task. - task: UnsafeCell<MaybeUninit<T>>, - - /// The state of the slot. - state: AtomicUsize, -} - -impl<T> Slot<T> { - const UNINIT: Self = Self { - task: UnsafeCell::new(MaybeUninit::uninit()), - state: AtomicUsize::new(0), - }; - - /// Waits until a task is written into the slot. - fn wait_write(&self) { - let backoff = Backoff::new(); - while self.state.load(Ordering::Acquire) & WRITE == 0 { - backoff.snooze(); - } - } -} - -/// A block in a linked list. -/// -/// Each block in the list can hold up to `BLOCK_CAP` values. -struct Block<T> { - /// The next block in the linked list. - next: AtomicPtr<Block<T>>, - - /// Slots for values. - slots: [Slot<T>; BLOCK_CAP], -} - -impl<T> Block<T> { - /// Creates an empty block that starts at `start_index`. - fn new() -> Block<T> { - Self { - next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], - } - } - - /// Waits until the next pointer is set. - fn wait_next(&self) -> *mut Block<T> { - let backoff = Backoff::new(); - loop { - let next = self.next.load(Ordering::Acquire); - if !next.is_null() { - return next; - } - backoff.snooze(); - } - } - - /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. - unsafe fn destroy(this: *mut Block<T>, count: usize) { - // It is not necessary to set the `DESTROY` bit in the last slot because that slot has - // begun destruction of the block. - for i in (0..count).rev() { - let slot = (*this).slots.get_unchecked(i); - - // Mark the `DESTROY` bit if a thread is still using the slot. - if slot.state.load(Ordering::Acquire) & READ == 0 - && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 - { - // If a thread is still using the slot, it will continue destruction of the block. - return; - } - } - - // No thread is using the block, now it is safe to destroy it. - drop(Box::from_raw(this)); - } -} - -/// A position in a queue. -struct Position<T> { - /// The index in the queue. - index: AtomicUsize, - - /// The block in the linked list. - block: AtomicPtr<Block<T>>, -} - -/// An injector queue. -/// -/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have -/// a single injector queue, which is the entry point for new tasks. -/// -/// # Examples -/// -/// ``` -/// use crossbeam_deque::{Injector, Steal}; -/// -/// let q = Injector::new(); -/// q.push(1); -/// q.push(2); -/// -/// assert_eq!(q.steal(), Steal::Success(1)); -/// assert_eq!(q.steal(), Steal::Success(2)); -/// assert_eq!(q.steal(), Steal::Empty); -/// ``` -pub struct Injector<T> { - /// The head of the queue. - head: CachePadded<Position<T>>, - - /// The tail of the queue. - tail: CachePadded<Position<T>>, - - /// Indicates that dropping a `Injector<T>` may drop values of type `T`. - _marker: PhantomData<T>, -} - -unsafe impl<T: Send> Send for Injector<T> {} -unsafe impl<T: Send> Sync for Injector<T> {} - -impl<T> Default for Injector<T> { - fn default() -> Self { - let block = Box::into_raw(Box::new(Block::<T>::new())); - Self { - head: CachePadded::new(Position { - block: AtomicPtr::new(block), - index: AtomicUsize::new(0), - }), - tail: CachePadded::new(Position { - block: AtomicPtr::new(block), - index: AtomicUsize::new(0), - }), - _marker: PhantomData, - } - } -} - -impl<T> Injector<T> { - /// Creates a new injector queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Injector; - /// - /// let q = Injector::<i32>::new(); - /// ``` - pub fn new() -> Injector<T> { - Self::default() - } - - /// Pushes a task into the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Injector; - /// - /// let w = Injector::new(); - /// w.push(1); - /// w.push(2); - /// ``` - pub fn push(&self, task: T) { - let backoff = Backoff::new(); - let mut tail = self.tail.index.load(Ordering::Acquire); - let mut block = self.tail.block.load(Ordering::Acquire); - let mut next_block = None; - - loop { - // Calculate the offset of the index into the block. - let offset = (tail >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - tail = self.tail.index.load(Ordering::Acquire); - block = self.tail.block.load(Ordering::Acquire); - continue; - } - - // If we're going to have to install the next block, allocate it in advance in order to - // make the wait for other threads as short as possible. - if offset + 1 == BLOCK_CAP && next_block.is_none() { - next_block = Some(Box::new(Block::<T>::new())); - } - - let new_tail = tail + (1 << SHIFT); - - // Try advancing the tail forward. - match self.tail.index.compare_exchange_weak( - tail, - new_tail, - Ordering::SeqCst, - Ordering::Acquire, - ) { - Ok(_) => unsafe { - // If we've reached the end of the block, install the next one. - if offset + 1 == BLOCK_CAP { - let next_block = Box::into_raw(next_block.unwrap()); - let next_index = new_tail.wrapping_add(1 << SHIFT); - - self.tail.block.store(next_block, Ordering::Release); - self.tail.index.store(next_index, Ordering::Release); - (*block).next.store(next_block, Ordering::Release); - } - - // Write the task into the slot. - let slot = (*block).slots.get_unchecked(offset); - slot.task.get().write(MaybeUninit::new(task)); - slot.state.fetch_or(WRITE, Ordering::Release); - - return; - }, - Err(t) => { - tail = t; - block = self.tail.block.load(Ordering::Acquire); - backoff.spin(); - } - } - } - } - - /// Steals a task from the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Injector, Steal}; - /// - /// let q = Injector::new(); - /// q.push(1); - /// q.push(2); - /// - /// assert_eq!(q.steal(), Steal::Success(1)); - /// assert_eq!(q.steal(), Steal::Success(2)); - /// assert_eq!(q.steal(), Steal::Empty); - /// ``` - pub fn steal(&self) -> Steal<T> { - let mut head; - let mut block; - let mut offset; - - let backoff = Backoff::new(); - loop { - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); - - // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - } else { - break; - } - } - - let mut new_head = head + (1 << SHIFT); - - if new_head & HAS_NEXT == 0 { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::Relaxed); - - // If the tail equals the head, that means the queue is empty. - if head >> SHIFT == tail >> SHIFT { - return Steal::Empty; - } - - // If head and tail are not in the same block, set `HAS_NEXT` in head. - if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { - new_head |= HAS_NEXT; - } - } - - // Try moving the head index forward. - if self - .head - .index - .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) - .is_err() - { - return Steal::Retry; - } - - unsafe { - // If we've reached the end of the block, move to the next one. - if offset + 1 == BLOCK_CAP { - let next = (*block).wait_next(); - let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); - if !(*next).next.load(Ordering::Relaxed).is_null() { - next_index |= HAS_NEXT; - } - - self.head.block.store(next, Ordering::Release); - self.head.index.store(next_index, Ordering::Release); - } - - // Read the task. - let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let task = slot.task.get().read().assume_init(); - - // Destroy the block if we've reached the end, or if another thread wanted to destroy - // but couldn't because we were busy reading from the slot. - if (offset + 1 == BLOCK_CAP) - || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) - { - Block::destroy(block, offset); - } - - Steal::Success(task) - } - } - - /// Steals a batch of tasks and pushes them into a worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than some constant limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Injector, Worker}; - /// - /// let q = Injector::new(); - /// q.push(1); - /// q.push(2); - /// q.push(3); - /// q.push(4); - /// - /// let w = Worker::new_fifo(); - /// let _ = q.steal_batch(&w); - /// assert_eq!(w.pop(), Some(1)); - /// assert_eq!(w.pop(), Some(2)); - /// ``` - pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { - self.steal_batch_with_limit(dest, MAX_BATCH) - } - - /// Steals no more than of tasks and pushes them into a worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than some constant limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Injector, Worker}; - /// - /// let q = Injector::new(); - /// q.push(1); - /// q.push(2); - /// q.push(3); - /// q.push(4); - /// q.push(5); - /// q.push(6); - /// - /// let w = Worker::new_fifo(); - /// let _ = q.steal_batch_with_limit(&w, 2); - /// assert_eq!(w.pop(), Some(1)); - /// assert_eq!(w.pop(), Some(2)); - /// assert_eq!(w.pop(), None); - /// - /// q.push(7); - /// q.push(8); - /// // Setting a large limit does not guarantee that all elements will be popped. In this case, - /// // half of the elements are currently popped, but the number of popped elements is considered - /// // an implementation detail that may be changed in the future. - /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); - /// assert_eq!(w.len(), 3); - /// ``` - pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { - assert!(limit > 0); - let mut head; - let mut block; - let mut offset; - - let backoff = Backoff::new(); - loop { - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); - - // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - } else { - break; - } - } - - let mut new_head = head; - let advance; - - if new_head & HAS_NEXT == 0 { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::Relaxed); - - // If the tail equals the head, that means the queue is empty. - if head >> SHIFT == tail >> SHIFT { - return Steal::Empty; - } - - // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate - // the right batch size to steal. - if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { - new_head |= HAS_NEXT; - // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(limit); - } else { - let len = (tail - head) >> SHIFT; - // Steal half of the available tasks. - advance = ((len + 1) / 2).min(limit); - } - } else { - // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(limit); - } - - new_head += advance << SHIFT; - let new_offset = offset + advance; - - // Try moving the head index forward. - if self - .head - .index - .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) - .is_err() - { - return Steal::Retry; - } - - // Reserve capacity for the stolen batch. - let batch_size = new_offset - offset; - dest.reserve(batch_size); - - // Get the destination buffer and back index. - let dest_buffer = dest.buffer.get(); - let dest_b = dest.inner.back.load(Ordering::Relaxed); - - unsafe { - // If we've reached the end of the block, move to the next one. - if new_offset == BLOCK_CAP { - let next = (*block).wait_next(); - let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); - if !(*next).next.load(Ordering::Relaxed).is_null() { - next_index |= HAS_NEXT; - } - - self.head.block.store(next, Ordering::Release); - self.head.index.store(next_index, Ordering::Release); - } - - // Copy values from the injector into the destination queue. - match dest.flavor { - Flavor::Fifo => { - for i in 0..batch_size { - // Read the task. - let slot = (*block).slots.get_unchecked(offset + i); - slot.wait_write(); - let task = slot.task.get().read(); - - // Write it into the destination queue. - dest_buffer.write(dest_b.wrapping_add(i as isize), task); - } - } - - Flavor::Lifo => { - for i in 0..batch_size { - // Read the task. - let slot = (*block).slots.get_unchecked(offset + i); - slot.wait_write(); - let task = slot.task.get().read(); - - // Write it into the destination queue. - dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); - } - } - } - - atomic::fence(Ordering::Release); - - // Update the back index in the destination queue. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report - // data races because it doesn't understand fences. - dest.inner - .back - .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); - - // Destroy the block if we've reached the end, or if another thread wanted to destroy - // but couldn't because we were busy reading from the slot. - if new_offset == BLOCK_CAP { - Block::destroy(block, offset); - } else { - for i in offset..new_offset { - let slot = (*block).slots.get_unchecked(i); - - if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { - Block::destroy(block, offset); - break; - } - } - } - - Steal::Success(()) - } - } - - /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than some constant limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Injector, Steal, Worker}; - /// - /// let q = Injector::new(); - /// q.push(1); - /// q.push(2); - /// q.push(3); - /// q.push(4); - /// - /// let w = Worker::new_fifo(); - /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); - /// assert_eq!(w.pop(), Some(2)); - /// ``` - pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { - // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly - // better, but we may change it in the future to be compatible with the same method in Stealer. - self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) - } - - /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. - /// - /// How many tasks exactly will be stolen is not specified. That said, this method will try to - /// steal around half of the tasks in the queue, but also not more than the given limit. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::{Injector, Steal, Worker}; - /// - /// let q = Injector::new(); - /// q.push(1); - /// q.push(2); - /// q.push(3); - /// q.push(4); - /// q.push(5); - /// q.push(6); - /// - /// let w = Worker::new_fifo(); - /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); - /// assert_eq!(w.pop(), Some(2)); - /// assert_eq!(w.pop(), None); - /// - /// q.push(7); - /// // Setting a large limit does not guarantee that all elements will be popped. In this case, - /// // half of the elements are currently popped, but the number of popped elements is considered - /// // an implementation detail that may be changed in the future. - /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); - /// assert_eq!(w.pop(), Some(4)); - /// assert_eq!(w.pop(), Some(5)); - /// assert_eq!(w.pop(), None); - /// ``` - pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { - assert!(limit > 0); - let mut head; - let mut block; - let mut offset; - - let backoff = Backoff::new(); - loop { - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); - - // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - } else { - break; - } - } - - let mut new_head = head; - let advance; - - if new_head & HAS_NEXT == 0 { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::Relaxed); - - // If the tail equals the head, that means the queue is empty. - if head >> SHIFT == tail >> SHIFT { - return Steal::Empty; - } - - // If head and tail are not in the same block, set `HAS_NEXT` in head. - if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { - new_head |= HAS_NEXT; - // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(limit); - } else { - let len = (tail - head) >> SHIFT; - // Steal half of the available tasks. - advance = ((len + 1) / 2).min(limit); - } - } else { - // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(limit); - } - - new_head += advance << SHIFT; - let new_offset = offset + advance; - - // Try moving the head index forward. - if self - .head - .index - .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) - .is_err() - { - return Steal::Retry; - } - - // Reserve capacity for the stolen batch. - let batch_size = new_offset - offset - 1; - dest.reserve(batch_size); - - // Get the destination buffer and back index. - let dest_buffer = dest.buffer.get(); - let dest_b = dest.inner.back.load(Ordering::Relaxed); - - unsafe { - // If we've reached the end of the block, move to the next one. - if new_offset == BLOCK_CAP { - let next = (*block).wait_next(); - let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); - if !(*next).next.load(Ordering::Relaxed).is_null() { - next_index |= HAS_NEXT; - } - - self.head.block.store(next, Ordering::Release); - self.head.index.store(next_index, Ordering::Release); - } - - // Read the task. - let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let task = slot.task.get().read(); - - match dest.flavor { - Flavor::Fifo => { - // Copy values from the injector into the destination queue. - for i in 0..batch_size { - // Read the task. - let slot = (*block).slots.get_unchecked(offset + i + 1); - slot.wait_write(); - let task = slot.task.get().read(); - - // Write it into the destination queue. - dest_buffer.write(dest_b.wrapping_add(i as isize), task); - } - } - - Flavor::Lifo => { - // Copy values from the injector into the destination queue. - for i in 0..batch_size { - // Read the task. - let slot = (*block).slots.get_unchecked(offset + i + 1); - slot.wait_write(); - let task = slot.task.get().read(); - - // Write it into the destination queue. - dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); - } - } - } - - atomic::fence(Ordering::Release); - - // Update the back index in the destination queue. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report - // data races because it doesn't understand fences. - dest.inner - .back - .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); - - // Destroy the block if we've reached the end, or if another thread wanted to destroy - // but couldn't because we were busy reading from the slot. - if new_offset == BLOCK_CAP { - Block::destroy(block, offset); - } else { - for i in offset..new_offset { - let slot = (*block).slots.get_unchecked(i); - - if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { - Block::destroy(block, offset); - break; - } - } - } - - Steal::Success(task.assume_init()) - } - } - - /// Returns `true` if the queue is empty. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Injector; - /// - /// let q = Injector::new(); - /// - /// assert!(q.is_empty()); - /// q.push(1); - /// assert!(!q.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - let head = self.head.index.load(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::SeqCst); - head >> SHIFT == tail >> SHIFT - } - - /// Returns the number of tasks in the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Injector; - /// - /// let q = Injector::new(); - /// - /// assert_eq!(q.len(), 0); - /// q.push(1); - /// assert_eq!(q.len(), 1); - /// q.push(1); - /// assert_eq!(q.len(), 2); - /// ``` - pub fn len(&self) -> usize { - loop { - // Load the tail index, then load the head index. - let mut tail = self.tail.index.load(Ordering::SeqCst); - let mut head = self.head.index.load(Ordering::SeqCst); - - // If the tail index didn't change, we've got consistent indices to work with. - if self.tail.index.load(Ordering::SeqCst) == tail { - // Erase the lower bits. - tail &= !((1 << SHIFT) - 1); - head &= !((1 << SHIFT) - 1); - - // Fix up indices if they fall onto block ends. - if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { - tail = tail.wrapping_add(1 << SHIFT); - } - if (head >> SHIFT) & (LAP - 1) == LAP - 1 { - head = head.wrapping_add(1 << SHIFT); - } - - // Rotate indices so that head falls into the first block. - let lap = (head >> SHIFT) / LAP; - tail = tail.wrapping_sub((lap * LAP) << SHIFT); - head = head.wrapping_sub((lap * LAP) << SHIFT); - - // Remove the lower bits. - tail >>= SHIFT; - head >>= SHIFT; - - // Return the difference minus the number of blocks between tail and head. - return tail - head - tail / LAP; - } - } - } -} - -impl<T> Drop for Injector<T> { - fn drop(&mut self) { - let mut head = *self.head.index.get_mut(); - let mut tail = *self.tail.index.get_mut(); - let mut block = *self.head.block.get_mut(); - - // Erase the lower bits. - head &= !((1 << SHIFT) - 1); - tail &= !((1 << SHIFT) - 1); - - unsafe { - // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. - while head != tail { - let offset = (head >> SHIFT) % LAP; - - if offset < BLOCK_CAP { - // Drop the task in the slot. - let slot = (*block).slots.get_unchecked(offset); - let p = &mut *slot.task.get(); - p.as_mut_ptr().drop_in_place(); - } else { - // Deallocate the block and move to the next one. - let next = *(*block).next.get_mut(); - drop(Box::from_raw(block)); - block = next; - } - - head = head.wrapping_add(1 << SHIFT); - } - - // Deallocate the last remaining block. - drop(Box::from_raw(block)); - } - } -} - -impl<T> fmt::Debug for Injector<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Worker { .. }") - } -} - -/// Possible outcomes of a steal operation. -/// -/// # Examples -/// -/// There are lots of ways to chain results of steal operations together: -/// -/// ``` -/// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; -/// -/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); -/// -/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); -/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); -/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); -/// -/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); -/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); -/// ``` -#[must_use] -#[derive(PartialEq, Eq, Copy, Clone)] -pub enum Steal<T> { - /// The queue was empty at the time of stealing. - Empty, - - /// At least one task was successfully stolen. - Success(T), - - /// The steal operation needs to be retried. - Retry, -} - -impl<T> Steal<T> { - /// Returns `true` if the queue was empty at the time of stealing. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Steal::{Empty, Retry, Success}; - /// - /// assert!(!Success(7).is_empty()); - /// assert!(!Retry::<i32>.is_empty()); - /// - /// assert!(Empty::<i32>.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - match self { - Steal::Empty => true, - _ => false, - } - } - - /// Returns `true` if at least one task was stolen. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Steal::{Empty, Retry, Success}; - /// - /// assert!(!Empty::<i32>.is_success()); - /// assert!(!Retry::<i32>.is_success()); - /// - /// assert!(Success(7).is_success()); - /// ``` - pub fn is_success(&self) -> bool { - match self { - Steal::Success(_) => true, - _ => false, - } - } - - /// Returns `true` if the steal operation needs to be retried. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Steal::{Empty, Retry, Success}; - /// - /// assert!(!Empty::<i32>.is_retry()); - /// assert!(!Success(7).is_retry()); - /// - /// assert!(Retry::<i32>.is_retry()); - /// ``` - pub fn is_retry(&self) -> bool { - match self { - Steal::Retry => true, - _ => false, - } - } - - /// Returns the result of the operation, if successful. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Steal::{Empty, Retry, Success}; - /// - /// assert_eq!(Empty::<i32>.success(), None); - /// assert_eq!(Retry::<i32>.success(), None); - /// - /// assert_eq!(Success(7).success(), Some(7)); - /// ``` - pub fn success(self) -> Option<T> { - match self { - Steal::Success(res) => Some(res), - _ => None, - } - } - - /// If no task was stolen, attempts another steal operation. - /// - /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: - /// - /// * If the second steal resulted in `Success`, it is returned. - /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. - /// * If both resulted in `None`, then `None` is returned. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_deque::Steal::{Empty, Retry, Success}; - /// - /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); - /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); - /// - /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); - /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); - /// - /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); - /// ``` - pub fn or_else<F>(self, f: F) -> Steal<T> - where - F: FnOnce() -> Steal<T>, - { - match self { - Steal::Empty => f(), - Steal::Success(_) => self, - Steal::Retry => { - if let Steal::Success(res) = f() { - Steal::Success(res) - } else { - Steal::Retry - } - } - } - } -} - -impl<T> fmt::Debug for Steal<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Steal::Empty => f.pad("Empty"), - Steal::Success(_) => f.pad("Success(..)"), - Steal::Retry => f.pad("Retry"), - } - } -} - -impl<T> FromIterator<Steal<T>> for Steal<T> { - /// Consumes items until a `Success` is found and returns it. - /// - /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. - /// Otherwise, `Empty` is returned. - fn from_iter<I>(iter: I) -> Steal<T> - where - I: IntoIterator<Item = Steal<T>>, - { - let mut retry = false; - for s in iter { - match &s { - Steal::Empty => {} - Steal::Success(_) => return s, - Steal::Retry => retry = true, - } - } - - if retry { - Steal::Retry - } else { - Steal::Empty - } - } -} diff --git a/vendor/crossbeam-deque/src/lib.rs b/vendor/crossbeam-deque/src/lib.rs deleted file mode 100644 index 16bc728..0000000 --- a/vendor/crossbeam-deque/src/lib.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! Concurrent work-stealing deques. -//! -//! These data structures are most commonly used in work-stealing schedulers. The typical setup -//! involves a number of threads, each having its own FIFO or LIFO queue (*worker*). There is also -//! one global FIFO queue (*injector*) and a list of references to *worker* queues that are able to -//! steal tasks (*stealers*). -//! -//! We spawn a new task onto the scheduler by pushing it into the *injector* queue. Each worker -//! thread waits in a loop until it finds the next task to run and then runs it. To find a task, it -//! first looks into its local *worker* queue, and then into the *injector* and *stealers*. -//! -//! # Queues -//! -//! [`Injector`] is a FIFO queue, where tasks are pushed and stolen from opposite ends. It is -//! shared among threads and is usually the entry point for new tasks. -//! -//! [`Worker`] has two constructors: -//! -//! * [`new_fifo()`] - Creates a FIFO queue, in which tasks are pushed and popped from opposite -//! ends. -//! * [`new_lifo()`] - Creates a LIFO queue, in which tasks are pushed and popped from the same -//! end. -//! -//! Each [`Worker`] is owned by a single thread and supports only push and pop operations. -//! -//! Method [`stealer()`] creates a [`Stealer`] that may be shared among threads and can only steal -//! tasks from its [`Worker`]. Tasks are stolen from the end opposite to where they get pushed. -//! -//! # Stealing -//! -//! Steal operations come in three flavors: -//! -//! 1. [`steal()`] - Steals one task. -//! 2. [`steal_batch()`] - Steals a batch of tasks and moves them into another worker. -//! 3. [`steal_batch_and_pop()`] - Steals a batch of tasks, moves them into another queue, and pops -//! one task from that worker. -//! -//! In contrast to push and pop operations, stealing can spuriously fail with [`Steal::Retry`], in -//! which case the steal operation needs to be retried. -//! -//! # Examples -//! -//! Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To -//! find an available task, it might do the following: -//! -//! 1. Try popping one task from the local worker queue. -//! 2. Try stealing a batch of tasks from the global injector queue. -//! 3. Try stealing one task from another thread using the stealer list. -//! -//! An implementation of this work-stealing strategy: -//! -//! ``` -//! use crossbeam_deque::{Injector, Stealer, Worker}; -//! use std::iter; -//! -//! fn find_task<T>( -//! local: &Worker<T>, -//! global: &Injector<T>, -//! stealers: &[Stealer<T>], -//! ) -> Option<T> { -//! // Pop a task from the local queue, if not empty. -//! local.pop().or_else(|| { -//! // Otherwise, we need to look for a task elsewhere. -//! iter::repeat_with(|| { -//! // Try stealing a batch of tasks from the global queue. -//! global.steal_batch_and_pop(local) -//! // Or try stealing a task from one of the other threads. -//! .or_else(|| stealers.iter().map(|s| s.steal()).collect()) -//! }) -//! // Loop while no task was stolen and any steal operation needs to be retried. -//! .find(|s| !s.is_retry()) -//! // Extract the stolen task, if there is one. -//! .and_then(|s| s.success()) -//! }) -//! } -//! ``` -//! -//! [`new_fifo()`]: Worker::new_fifo -//! [`new_lifo()`]: Worker::new_lifo -//! [`stealer()`]: Worker::stealer -//! [`steal()`]: Stealer::steal -//! [`steal_batch()`]: Stealer::steal_batch -//! [`steal_batch_and_pop()`]: Stealer::steal_batch_and_pop - -#![doc(test( - no_crate_inject, - attr( - deny(warnings, rust_2018_idioms), - allow(dead_code, unused_assignments, unused_variables) - ) -))] -#![warn( - missing_docs, - missing_debug_implementations, - rust_2018_idioms, - unreachable_pub -)] -#![cfg_attr(not(feature = "std"), no_std)] - -use cfg_if::cfg_if; - -cfg_if! { - if #[cfg(feature = "std")] { - use crossbeam_epoch as epoch; - use crossbeam_utils as utils; - - mod deque; - pub use crate::deque::{Injector, Steal, Stealer, Worker}; - } -} diff --git a/vendor/crossbeam-deque/tests/fifo.rs b/vendor/crossbeam-deque/tests/fifo.rs deleted file mode 100644 index f98737b..0000000 --- a/vendor/crossbeam-deque/tests/fifo.rs +++ /dev/null @@ -1,357 +0,0 @@ -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::sync::{Arc, Mutex}; - -use crossbeam_deque::Steal::{Empty, Success}; -use crossbeam_deque::Worker; -use crossbeam_utils::thread::scope; -use rand::Rng; - -#[test] -fn smoke() { - let w = Worker::new_fifo(); - let s = w.stealer(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - - w.push(1); - assert_eq!(w.pop(), Some(1)); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - - w.push(2); - assert_eq!(s.steal(), Success(2)); - assert_eq!(s.steal(), Empty); - assert_eq!(w.pop(), None); - - w.push(3); - w.push(4); - w.push(5); - assert_eq!(s.steal(), Success(3)); - assert_eq!(s.steal(), Success(4)); - assert_eq!(s.steal(), Success(5)); - assert_eq!(s.steal(), Empty); - - w.push(6); - w.push(7); - w.push(8); - w.push(9); - assert_eq!(w.pop(), Some(6)); - assert_eq!(s.steal(), Success(7)); - assert_eq!(w.pop(), Some(8)); - assert_eq!(w.pop(), Some(9)); - assert_eq!(w.pop(), None); -} - -#[test] -fn is_empty() { - let w = Worker::new_fifo(); - let s = w.stealer(); - - assert!(w.is_empty()); - w.push(1); - assert!(!w.is_empty()); - w.push(2); - assert!(!w.is_empty()); - let _ = w.pop(); - assert!(!w.is_empty()); - let _ = w.pop(); - assert!(w.is_empty()); - - assert!(s.is_empty()); - w.push(1); - assert!(!s.is_empty()); - w.push(2); - assert!(!s.is_empty()); - let _ = s.steal(); - assert!(!s.is_empty()); - let _ = s.steal(); - assert!(s.is_empty()); -} - -#[test] -fn spsc() { - #[cfg(miri)] - const STEPS: usize = 500; - #[cfg(not(miri))] - const STEPS: usize = 50_000; - - let w = Worker::new_fifo(); - let s = w.stealer(); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..STEPS { - loop { - if let Success(v) = s.steal() { - assert_eq!(i, v); - break; - } - } - } - - assert_eq!(s.steal(), Empty); - }); - - for i in 0..STEPS { - w.push(i); - } - }) - .unwrap(); -} - -#[test] -fn stampede() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let w = Worker::new_fifo(); - - for i in 0..COUNT { - w.push(Box::new(i + 1)); - } - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let remaining = remaining.clone(); - - scope.spawn(move |_| { - let mut last = 0; - while remaining.load(SeqCst) > 0 { - if let Success(x) = s.steal() { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - let mut last = 0; - while remaining.load(SeqCst) > 0 { - if let Some(x) = w.pop() { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); -} - -#[test] -fn stress() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let w = Worker::new_fifo(); - let done = Arc::new(AtomicBool::new(false)); - let hits = Arc::new(AtomicUsize::new(0)); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let done = done.clone(); - let hits = hits.clone(); - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - - while !done.load(SeqCst) { - if let Success(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut expected = 0; - while expected < COUNT { - if rng.gen_range(0..3) == 0 { - while w.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } else { - w.push(expected); - expected += 1; - } - } - - while hits.load(SeqCst) < COUNT { - while w.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[cfg_attr(miri, ignore)] // Miri is too slow -#[test] -fn no_starvation() { - const THREADS: usize = 8; - const COUNT: usize = 50_000; - - let w = Worker::new_fifo(); - let done = Arc::new(AtomicBool::new(false)); - let mut all_hits = Vec::new(); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let done = done.clone(); - let hits = Arc::new(AtomicUsize::new(0)); - all_hits.push(hits.clone()); - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - - while !done.load(SeqCst) { - if let Success(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut my_hits = 0; - loop { - for i in 0..rng.gen_range(0..COUNT) { - if rng.gen_range(0..3) == 0 && my_hits == 0 { - while w.pop().is_some() { - my_hits += 1; - } - } else { - w.push(i); - } - } - - if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { - break; - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[test] -fn destructors() { - #[cfg(miri)] - const THREADS: usize = 2; - #[cfg(not(miri))] - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - #[cfg(miri)] - const STEPS: usize = 100; - #[cfg(not(miri))] - const STEPS: usize = 1000; - - struct Elem(usize, Arc<Mutex<Vec<usize>>>); - - impl Drop for Elem { - fn drop(&mut self) { - self.1.lock().unwrap().push(self.0); - } - } - - let w = Worker::new_fifo(); - let dropped = Arc::new(Mutex::new(Vec::new())); - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - for i in 0..COUNT { - w.push(Elem(i, dropped.clone())); - } - - scope(|scope| { - for _ in 0..THREADS { - let remaining = remaining.clone(); - let s = w.stealer(); - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - let mut cnt = 0; - - while cnt < STEPS { - if let Success(_) = s.steal() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - while w2.pop().is_some() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - for _ in 0..STEPS { - if w.pop().is_some() { - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); - - let rem = remaining.load(SeqCst); - assert!(rem > 0); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), COUNT - rem); - v.clear(); - } - - drop(w); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), rem); - v.sort_unstable(); - for pair in v.windows(2) { - assert_eq!(pair[0] + 1, pair[1]); - } - } -} diff --git a/vendor/crossbeam-deque/tests/injector.rs b/vendor/crossbeam-deque/tests/injector.rs deleted file mode 100644 index f706a8d..0000000 --- a/vendor/crossbeam-deque/tests/injector.rs +++ /dev/null @@ -1,375 +0,0 @@ -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::sync::{Arc, Mutex}; - -use crossbeam_deque::Steal::{Empty, Success}; -use crossbeam_deque::{Injector, Worker}; -use crossbeam_utils::thread::scope; -use rand::Rng; - -#[test] -fn smoke() { - let q = Injector::new(); - assert_eq!(q.steal(), Empty); - - q.push(1); - q.push(2); - assert_eq!(q.steal(), Success(1)); - assert_eq!(q.steal(), Success(2)); - assert_eq!(q.steal(), Empty); - - q.push(3); - assert_eq!(q.steal(), Success(3)); - assert_eq!(q.steal(), Empty); -} - -#[test] -fn is_empty() { - let q = Injector::new(); - assert!(q.is_empty()); - - q.push(1); - assert!(!q.is_empty()); - q.push(2); - assert!(!q.is_empty()); - - let _ = q.steal(); - assert!(!q.is_empty()); - let _ = q.steal(); - assert!(q.is_empty()); - - q.push(3); - assert!(!q.is_empty()); - let _ = q.steal(); - assert!(q.is_empty()); -} - -#[test] -fn spsc() { - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 100_000; - - let q = Injector::new(); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - loop { - if let Success(v) = q.steal() { - assert_eq!(i, v); - break; - } - #[cfg(miri)] - std::hint::spin_loop(); - } - } - - assert_eq!(q.steal(), Empty); - }); - - for i in 0..COUNT { - q.push(i); - } - }) - .unwrap(); -} - -#[test] -fn mpmc() { - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let q = Injector::new(); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - q.push(i); - } - }); - } - - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - loop { - if let Success(n) = q.steal() { - v[n].fetch_add(1, SeqCst); - break; - } - #[cfg(miri)] - std::hint::spin_loop(); - } - } - }); - } - }) - .unwrap(); - - for c in v { - assert_eq!(c.load(SeqCst), THREADS); - } -} - -#[test] -fn stampede() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let q = Injector::new(); - - for i in 0..COUNT { - q.push(Box::new(i + 1)); - } - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - scope(|scope| { - for _ in 0..THREADS { - let remaining = remaining.clone(); - let q = &q; - - scope.spawn(move |_| { - let mut last = 0; - while remaining.load(SeqCst) > 0 { - if let Success(x) = q.steal() { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - let mut last = 0; - while remaining.load(SeqCst) > 0 { - if let Success(x) = q.steal() { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); -} - -#[test] -fn stress() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let q = Injector::new(); - let done = Arc::new(AtomicBool::new(false)); - let hits = Arc::new(AtomicUsize::new(0)); - - scope(|scope| { - for _ in 0..THREADS { - let done = done.clone(); - let hits = hits.clone(); - let q = &q; - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - - while !done.load(SeqCst) { - if let Success(_) = q.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = q.steal_batch(&w2); - - if let Success(_) = q.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut expected = 0; - while expected < COUNT { - if rng.gen_range(0..3) == 0 { - while let Success(_) = q.steal() { - hits.fetch_add(1, SeqCst); - } - } else { - q.push(expected); - expected += 1; - } - } - - while hits.load(SeqCst) < COUNT { - while let Success(_) = q.steal() { - hits.fetch_add(1, SeqCst); - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[cfg_attr(miri, ignore)] // Miri is too slow -#[test] -fn no_starvation() { - const THREADS: usize = 8; - const COUNT: usize = 50_000; - - let q = Injector::new(); - let done = Arc::new(AtomicBool::new(false)); - let mut all_hits = Vec::new(); - - scope(|scope| { - for _ in 0..THREADS { - let done = done.clone(); - let hits = Arc::new(AtomicUsize::new(0)); - all_hits.push(hits.clone()); - let q = &q; - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - - while !done.load(SeqCst) { - if let Success(_) = q.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = q.steal_batch(&w2); - - if let Success(_) = q.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut my_hits = 0; - loop { - for i in 0..rng.gen_range(0..COUNT) { - if rng.gen_range(0..3) == 0 && my_hits == 0 { - while let Success(_) = q.steal() { - my_hits += 1; - } - } else { - q.push(i); - } - } - - if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { - break; - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[test] -fn destructors() { - #[cfg(miri)] - const THREADS: usize = 2; - #[cfg(not(miri))] - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - #[cfg(miri)] - const STEPS: usize = 100; - #[cfg(not(miri))] - const STEPS: usize = 1000; - - struct Elem(usize, Arc<Mutex<Vec<usize>>>); - - impl Drop for Elem { - fn drop(&mut self) { - self.1.lock().unwrap().push(self.0); - } - } - - let q = Injector::new(); - let dropped = Arc::new(Mutex::new(Vec::new())); - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - for i in 0..COUNT { - q.push(Elem(i, dropped.clone())); - } - - scope(|scope| { - for _ in 0..THREADS { - let remaining = remaining.clone(); - let q = &q; - - scope.spawn(move |_| { - let w2 = Worker::new_fifo(); - let mut cnt = 0; - - while cnt < STEPS { - if let Success(_) = q.steal() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - let _ = q.steal_batch(&w2); - - if let Success(_) = q.steal_batch_and_pop(&w2) { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - while w2.pop().is_some() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - for _ in 0..STEPS { - if let Success(_) = q.steal() { - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); - - let rem = remaining.load(SeqCst); - assert!(rem > 0); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), COUNT - rem); - v.clear(); - } - - drop(q); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), rem); - v.sort_unstable(); - for pair in v.windows(2) { - assert_eq!(pair[0] + 1, pair[1]); - } - } -} diff --git a/vendor/crossbeam-deque/tests/lifo.rs b/vendor/crossbeam-deque/tests/lifo.rs deleted file mode 100644 index c1a65cd..0000000 --- a/vendor/crossbeam-deque/tests/lifo.rs +++ /dev/null @@ -1,359 +0,0 @@ -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::sync::{Arc, Mutex}; - -use crossbeam_deque::Steal::{Empty, Success}; -use crossbeam_deque::Worker; -use crossbeam_utils::thread::scope; -use rand::Rng; - -#[test] -fn smoke() { - let w = Worker::new_lifo(); - let s = w.stealer(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - - w.push(1); - assert_eq!(w.pop(), Some(1)); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - - w.push(2); - assert_eq!(s.steal(), Success(2)); - assert_eq!(s.steal(), Empty); - assert_eq!(w.pop(), None); - - w.push(3); - w.push(4); - w.push(5); - assert_eq!(s.steal(), Success(3)); - assert_eq!(s.steal(), Success(4)); - assert_eq!(s.steal(), Success(5)); - assert_eq!(s.steal(), Empty); - - w.push(6); - w.push(7); - w.push(8); - w.push(9); - assert_eq!(w.pop(), Some(9)); - assert_eq!(s.steal(), Success(6)); - assert_eq!(w.pop(), Some(8)); - assert_eq!(w.pop(), Some(7)); - assert_eq!(w.pop(), None); -} - -#[test] -fn is_empty() { - let w = Worker::new_lifo(); - let s = w.stealer(); - - assert!(w.is_empty()); - w.push(1); - assert!(!w.is_empty()); - w.push(2); - assert!(!w.is_empty()); - let _ = w.pop(); - assert!(!w.is_empty()); - let _ = w.pop(); - assert!(w.is_empty()); - - assert!(s.is_empty()); - w.push(1); - assert!(!s.is_empty()); - w.push(2); - assert!(!s.is_empty()); - let _ = s.steal(); - assert!(!s.is_empty()); - let _ = s.steal(); - assert!(s.is_empty()); -} - -#[test] -fn spsc() { - #[cfg(miri)] - const STEPS: usize = 500; - #[cfg(not(miri))] - const STEPS: usize = 50_000; - - let w = Worker::new_lifo(); - let s = w.stealer(); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..STEPS { - loop { - if let Success(v) = s.steal() { - assert_eq!(i, v); - break; - } - #[cfg(miri)] - std::hint::spin_loop(); - } - } - - assert_eq!(s.steal(), Empty); - }); - - for i in 0..STEPS { - w.push(i); - } - }) - .unwrap(); -} - -#[test] -fn stampede() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let w = Worker::new_lifo(); - - for i in 0..COUNT { - w.push(Box::new(i + 1)); - } - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let remaining = remaining.clone(); - - scope.spawn(move |_| { - let mut last = 0; - while remaining.load(SeqCst) > 0 { - if let Success(x) = s.steal() { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - let mut last = COUNT + 1; - while remaining.load(SeqCst) > 0 { - if let Some(x) = w.pop() { - assert!(last > *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); -} - -#[test] -fn stress() { - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - - let w = Worker::new_lifo(); - let done = Arc::new(AtomicBool::new(false)); - let hits = Arc::new(AtomicUsize::new(0)); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let done = done.clone(); - let hits = hits.clone(); - - scope.spawn(move |_| { - let w2 = Worker::new_lifo(); - - while !done.load(SeqCst) { - if let Success(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut expected = 0; - while expected < COUNT { - if rng.gen_range(0..3) == 0 { - while w.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } else { - w.push(expected); - expected += 1; - } - } - - while hits.load(SeqCst) < COUNT { - while w.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[cfg_attr(miri, ignore)] // Miri is too slow -#[test] -fn no_starvation() { - const THREADS: usize = 8; - const COUNT: usize = 50_000; - - let w = Worker::new_lifo(); - let done = Arc::new(AtomicBool::new(false)); - let mut all_hits = Vec::new(); - - scope(|scope| { - for _ in 0..THREADS { - let s = w.stealer(); - let done = done.clone(); - let hits = Arc::new(AtomicUsize::new(0)); - all_hits.push(hits.clone()); - - scope.spawn(move |_| { - let w2 = Worker::new_lifo(); - - while !done.load(SeqCst) { - if let Success(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - hits.fetch_add(1, SeqCst); - } - - while w2.pop().is_some() { - hits.fetch_add(1, SeqCst); - } - } - }); - } - - let mut rng = rand::thread_rng(); - let mut my_hits = 0; - loop { - for i in 0..rng.gen_range(0..COUNT) { - if rng.gen_range(0..3) == 0 && my_hits == 0 { - while w.pop().is_some() { - my_hits += 1; - } - } else { - w.push(i); - } - } - - if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { - break; - } - } - done.store(true, SeqCst); - }) - .unwrap(); -} - -#[test] -fn destructors() { - #[cfg(miri)] - const THREADS: usize = 2; - #[cfg(not(miri))] - const THREADS: usize = 8; - #[cfg(miri)] - const COUNT: usize = 500; - #[cfg(not(miri))] - const COUNT: usize = 50_000; - #[cfg(miri)] - const STEPS: usize = 100; - #[cfg(not(miri))] - const STEPS: usize = 1000; - - struct Elem(usize, Arc<Mutex<Vec<usize>>>); - - impl Drop for Elem { - fn drop(&mut self) { - self.1.lock().unwrap().push(self.0); - } - } - - let w = Worker::new_lifo(); - let dropped = Arc::new(Mutex::new(Vec::new())); - let remaining = Arc::new(AtomicUsize::new(COUNT)); - - for i in 0..COUNT { - w.push(Elem(i, dropped.clone())); - } - - scope(|scope| { - for _ in 0..THREADS { - let remaining = remaining.clone(); - let s = w.stealer(); - - scope.spawn(move |_| { - let w2 = Worker::new_lifo(); - let mut cnt = 0; - - while cnt < STEPS { - if let Success(_) = s.steal() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - let _ = s.steal_batch(&w2); - - if let Success(_) = s.steal_batch_and_pop(&w2) { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - - while w2.pop().is_some() { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - } - }); - } - - for _ in 0..STEPS { - if w.pop().is_some() { - remaining.fetch_sub(1, SeqCst); - } - } - }) - .unwrap(); - - let rem = remaining.load(SeqCst); - assert!(rem > 0); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), COUNT - rem); - v.clear(); - } - - drop(w); - - { - let mut v = dropped.lock().unwrap(); - assert_eq!(v.len(), rem); - v.sort_unstable(); - for pair in v.windows(2) { - assert_eq!(pair[0] + 1, pair[1]); - } - } -} diff --git a/vendor/crossbeam-deque/tests/steal.rs b/vendor/crossbeam-deque/tests/steal.rs deleted file mode 100644 index af24998..0000000 --- a/vendor/crossbeam-deque/tests/steal.rs +++ /dev/null @@ -1,212 +0,0 @@ -use crossbeam_deque::Steal::Success; -use crossbeam_deque::{Injector, Worker}; - -#[test] -fn steal_fifo() { - let w = Worker::new_fifo(); - for i in 1..=3 { - w.push(i); - } - - let s = w.stealer(); - assert_eq!(s.steal(), Success(1)); - assert_eq!(s.steal(), Success(2)); - assert_eq!(s.steal(), Success(3)); -} - -#[test] -fn steal_lifo() { - let w = Worker::new_lifo(); - for i in 1..=3 { - w.push(i); - } - - let s = w.stealer(); - assert_eq!(s.steal(), Success(1)); - assert_eq!(s.steal(), Success(2)); - assert_eq!(s.steal(), Success(3)); -} - -#[test] -fn steal_injector() { - let q = Injector::new(); - for i in 1..=3 { - q.push(i); - } - - assert_eq!(q.steal(), Success(1)); - assert_eq!(q.steal(), Success(2)); - assert_eq!(q.steal(), Success(3)); -} - -#[test] -fn steal_batch_fifo_fifo() { - let w = Worker::new_fifo(); - for i in 1..=4 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_fifo(); - - assert_eq!(s.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(1)); - assert_eq!(w2.pop(), Some(2)); -} - -#[test] -fn steal_batch_lifo_lifo() { - let w = Worker::new_lifo(); - for i in 1..=4 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_lifo(); - - assert_eq!(s.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(1)); -} - -#[test] -fn steal_batch_fifo_lifo() { - let w = Worker::new_fifo(); - for i in 1..=4 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_lifo(); - - assert_eq!(s.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(1)); - assert_eq!(w2.pop(), Some(2)); -} - -#[test] -fn steal_batch_lifo_fifo() { - let w = Worker::new_lifo(); - for i in 1..=4 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_fifo(); - - assert_eq!(s.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(1)); -} - -#[test] -fn steal_batch_injector_fifo() { - let q = Injector::new(); - for i in 1..=4 { - q.push(i); - } - - let w2 = Worker::new_fifo(); - assert_eq!(q.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(1)); - assert_eq!(w2.pop(), Some(2)); -} - -#[test] -fn steal_batch_injector_lifo() { - let q = Injector::new(); - for i in 1..=4 { - q.push(i); - } - - let w2 = Worker::new_lifo(); - assert_eq!(q.steal_batch(&w2), Success(())); - assert_eq!(w2.pop(), Some(1)); - assert_eq!(w2.pop(), Some(2)); -} - -#[test] -fn steal_batch_and_pop_fifo_fifo() { - let w = Worker::new_fifo(); - for i in 1..=6 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_fifo(); - - assert_eq!(s.steal_batch_and_pop(&w2), Success(1)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(3)); -} - -#[test] -fn steal_batch_and_pop_lifo_lifo() { - let w = Worker::new_lifo(); - for i in 1..=6 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_lifo(); - - assert_eq!(s.steal_batch_and_pop(&w2), Success(3)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(1)); -} - -#[test] -fn steal_batch_and_pop_fifo_lifo() { - let w = Worker::new_fifo(); - for i in 1..=6 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_lifo(); - - assert_eq!(s.steal_batch_and_pop(&w2), Success(1)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(3)); -} - -#[test] -fn steal_batch_and_pop_lifo_fifo() { - let w = Worker::new_lifo(); - for i in 1..=6 { - w.push(i); - } - - let s = w.stealer(); - let w2 = Worker::new_fifo(); - - assert_eq!(s.steal_batch_and_pop(&w2), Success(3)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(1)); -} - -#[test] -fn steal_batch_and_pop_injector_fifo() { - let q = Injector::new(); - for i in 1..=6 { - q.push(i); - } - - let w2 = Worker::new_fifo(); - assert_eq!(q.steal_batch_and_pop(&w2), Success(1)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(3)); -} - -#[test] -fn steal_batch_and_pop_injector_lifo() { - let q = Injector::new(); - for i in 1..=6 { - q.push(i); - } - - let w2 = Worker::new_lifo(); - assert_eq!(q.steal_batch_and_pop(&w2), Success(1)); - assert_eq!(w2.pop(), Some(2)); - assert_eq!(w2.pop(), Some(3)); -} |