diff options
Diffstat (limited to 'vendor/rayon-core/src/job.rs')
-rw-r--r-- | vendor/rayon-core/src/job.rs | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs new file mode 100644 index 0000000..5664bb3 --- /dev/null +++ b/vendor/rayon-core/src/job.rs @@ -0,0 +1,270 @@ +use crate::latch::Latch; +use crate::unwind; +use crossbeam_deque::{Injector, Steal}; +use std::any::Any; +use std::cell::UnsafeCell; +use std::mem; +use std::sync::Arc; + +pub(super) enum JobResult<T> { + None, + Ok(T), + Panic(Box<dyn Any + Send>), +} + +/// A `Job` is used to advertise work for other threads that they may +/// want to steal. In accordance with time honored tradition, jobs are +/// arranged in a deque, so that thieves can take from the top of the +/// deque while the main worker manages the bottom of the deque. This +/// deque is managed by the `thread_pool` module. +pub(super) trait Job { + /// Unsafe: this may be called from a different thread than the one + /// which scheduled the job, so the implementer must ensure the + /// appropriate traits are met, whether `Send`, `Sync`, or both. + unsafe fn execute(this: *const ()); +} + +/// Effectively a Job trait object. Each JobRef **must** be executed +/// exactly once, or else data may leak. +/// +/// Internally, we store the job's data in a `*const ()` pointer. The +/// true type is something like `*const StackJob<...>`, but we hide +/// it. We also carry the "execute fn" from the `Job` trait. +pub(super) struct JobRef { + pointer: *const (), + execute_fn: unsafe fn(*const ()), +} + +unsafe impl Send for JobRef {} +unsafe impl Sync for JobRef {} + +impl JobRef { + /// Unsafe: caller asserts that `data` will remain valid until the + /// job is executed. + pub(super) unsafe fn new<T>(data: *const T) -> JobRef + where + T: Job, + { + // erase types: + JobRef { + pointer: data as *const (), + execute_fn: <T as Job>::execute, + } + } + + /// Returns an opaque handle that can be saved and compared, + /// without making `JobRef` itself `Copy + Eq`. + #[inline] + pub(super) fn id(&self) -> impl Eq { + (self.pointer, self.execute_fn) + } + + #[inline] + pub(super) unsafe fn execute(self) { + (self.execute_fn)(self.pointer) + } +} + +/// A job that will be owned by a stack slot. This means that when it +/// executes it need not free any heap data, the cleanup occurs when +/// the stack frame is later popped. The function parameter indicates +/// `true` if the job was stolen -- executed on a different thread. +pub(super) struct StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) latch: L, + func: UnsafeCell<Option<F>>, + result: UnsafeCell<JobResult<R>>, +} + +impl<L, F, R> StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> { + StackJob { + latch, + func: UnsafeCell::new(Some(func)), + result: UnsafeCell::new(JobResult::None), + } + } + + pub(super) unsafe fn as_job_ref(&self) -> JobRef { + JobRef::new(self) + } + + pub(super) unsafe fn run_inline(self, stolen: bool) -> R { + self.func.into_inner().unwrap()(stolen) + } + + pub(super) unsafe fn into_result(self) -> R { + self.result.into_inner().into_return_value() + } +} + +impl<L, F, R> Job for StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + unsafe fn execute(this: *const ()) { + let this = &*(this as *const Self); + let abort = unwind::AbortIfPanic; + let func = (*this.func.get()).take().unwrap(); + (*this.result.get()) = JobResult::call(func); + Latch::set(&this.latch); + mem::forget(abort); + } +} + +/// Represents a job stored in the heap. Used to implement +/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply +/// invokes a closure, which then triggers the appropriate logic to +/// signal that the job executed. +/// +/// (Probably `StackJob` should be refactored in a similar fashion.) +pub(super) struct HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + job: BODY, +} + +impl<BODY> HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + pub(super) fn new(job: BODY) -> Box<Self> { + Box::new(HeapJob { job }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { + JobRef::new(Box::into_raw(self)) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef + where + BODY: 'static, + { + unsafe { self.into_job_ref() } + } +} + +impl<BODY> Job for HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + unsafe fn execute(this: *const ()) { + let this = Box::from_raw(this as *mut Self); + (this.job)(); + } +} + +/// Represents a job stored in an `Arc` -- like `HeapJob`, but may +/// be turned into multiple `JobRef`s and called multiple times. +pub(super) struct ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + job: BODY, +} + +impl<BODY> ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + pub(super) fn new(job: BODY) -> Arc<Self> { + Arc::new(ArcJob { job }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { + JobRef::new(Arc::into_raw(Arc::clone(this))) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef + where + BODY: 'static, + { + unsafe { Self::as_job_ref(this) } + } +} + +impl<BODY> Job for ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + unsafe fn execute(this: *const ()) { + let this = Arc::from_raw(this as *mut Self); + (this.job)(); + } +} + +impl<T> JobResult<T> { + fn call(func: impl FnOnce(bool) -> T) -> Self { + match unwind::halt_unwinding(|| func(true)) { + Ok(x) => JobResult::Ok(x), + Err(x) => JobResult::Panic(x), + } + } + + /// Convert the `JobResult` for a job that has finished (and hence + /// its JobResult is populated) into its return value. + /// + /// NB. This will panic if the job panicked. + pub(super) fn into_return_value(self) -> T { + match self { + JobResult::None => unreachable!(), + JobResult::Ok(x) => x, + JobResult::Panic(x) => unwind::resume_unwinding(x), + } + } +} + +/// Indirect queue to provide FIFO job priority. +pub(super) struct JobFifo { + inner: Injector<JobRef>, +} + +impl JobFifo { + pub(super) fn new() -> Self { + JobFifo { + inner: Injector::new(), + } + } + + pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { + // A little indirection ensures that spawns are always prioritized in FIFO order. The + // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front + // (FIFO), but either way they will end up popping from the front of this queue. + self.inner.push(job_ref); + JobRef::new(self) + } +} + +impl Job for JobFifo { + unsafe fn execute(this: *const ()) { + // We "execute" a queue by executing its first job, FIFO. + let this = &*(this as *const Self); + loop { + match this.inner.steal() { + Steal::Success(job_ref) => break job_ref.execute(), + Steal::Empty => panic!("FIFO is empty"), + Steal::Retry => {} + } + } + } +} |