diff options
Diffstat (limited to 'vendor/jpeg-decoder/src/worker/multithreaded.rs')
-rw-r--r-- | vendor/jpeg-decoder/src/worker/multithreaded.rs | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/vendor/jpeg-decoder/src/worker/multithreaded.rs b/vendor/jpeg-decoder/src/worker/multithreaded.rs new file mode 100644 index 0000000..c820702 --- /dev/null +++ b/vendor/jpeg-decoder/src/worker/multithreaded.rs @@ -0,0 +1,123 @@ +//! This module implements per-component parallelism. +//! It should be possible to implement per-row parallelism as well, +//! which should also boost performance of grayscale images +//! and allow scaling to more cores. +//! However, that would be more complex, so we use this as a starting point. + +use super::immediate::ImmediateWorker; +use super::{RowData, Worker}; +use crate::decoder::MAX_COMPONENTS; +use crate::error::Result; +use std::{ + mem, + sync::mpsc::{self, Receiver, Sender}, +}; + +enum WorkerMsg { + Start(RowData), + AppendRow(Vec<i16>), + GetResult(Sender<Vec<u8>>), +} + +#[derive(Default)] +pub struct MpscWorker { + senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS], +} + +impl MpscWorker { + fn start_with( + &mut self, + row_data: RowData, + spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>, + ) -> Result<()> { + // if there is no worker thread for this component yet, start one + let component = row_data.index; + if let None = self.senders[component] { + let sender = spawn_worker(component)?; + self.senders[component] = Some(sender); + } + + // we do the "take out value and put it back in once we're done" dance here + // and in all other message-passing methods because there's not that many rows + // and this should be cheaper than spawning MAX_COMPONENTS many threads up front + let sender = self.senders[component].as_mut().unwrap(); + sender + .send(WorkerMsg::Start(row_data)) + .expect("jpeg-decoder worker thread error"); + Ok(()) + } + + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + let component = row.0; + let sender = self.senders[component].as_mut().unwrap(); + sender + .send(WorkerMsg::AppendRow(row.1)) + .expect("jpeg-decoder worker thread error"); + Ok(()) + } + + fn get_result_with( + &mut self, + index: usize, + collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>, + ) -> Result<Vec<u8>> { + let (tx, rx) = mpsc::channel(); + let sender = mem::take(&mut self.senders[index]).unwrap(); + sender + .send(WorkerMsg::GetResult(tx)) + .expect("jpeg-decoder worker thread error"); + Ok(collect(rx)) + } +} + +impl Worker for MpscWorker { + fn start(&mut self, row_data: RowData) -> Result<()> { + self.start_with(row_data, spawn_worker_thread) + } + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + MpscWorker::append_row(self, row) + } + fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { + self.get_result_with(index, collect_worker_thread) + } +} + +fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) { + let (tx, rx) = mpsc::channel(); + let closure = move || { + let mut worker = ImmediateWorker::default(); + + while let Ok(message) = rx.recv() { + match message { + WorkerMsg::Start(mut data) => { + // we always set component index to 0 for worker threads + // because they only ever handle one per thread and we don't want them + // to attempt to access nonexistent components + data.index = 0; + worker.start_immediate(data); + } + WorkerMsg::AppendRow(row) => { + worker.append_row_immediate((0, row)); + } + WorkerMsg::GetResult(chan) => { + let _ = chan.send(worker.get_result_immediate(0)); + break; + } + } + } + }; + + (tx, closure) +} + +fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> { + let (tx, worker) = create_worker(); + let thread_builder = + std::thread::Builder::new().name(format!("worker thread for component {}", component)); + thread_builder.spawn(worker)?; + Ok(tx) +} + +fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> { + rx.recv().expect("jpeg-decoder worker thread error") +} |