diff options
Diffstat (limited to 'vendor/jpeg-decoder/src/worker/multithreaded.rs')
-rw-r--r-- | vendor/jpeg-decoder/src/worker/multithreaded.rs | 123 |
1 files changed, 0 insertions, 123 deletions
diff --git a/vendor/jpeg-decoder/src/worker/multithreaded.rs b/vendor/jpeg-decoder/src/worker/multithreaded.rs deleted file mode 100644 index c820702..0000000 --- a/vendor/jpeg-decoder/src/worker/multithreaded.rs +++ /dev/null @@ -1,123 +0,0 @@ -//! This module implements per-component parallelism. -//! It should be possible to implement per-row parallelism as well, -//! which should also boost performance of grayscale images -//! and allow scaling to more cores. -//! However, that would be more complex, so we use this as a starting point. - -use super::immediate::ImmediateWorker; -use super::{RowData, Worker}; -use crate::decoder::MAX_COMPONENTS; -use crate::error::Result; -use std::{ - mem, - sync::mpsc::{self, Receiver, Sender}, -}; - -enum WorkerMsg { - Start(RowData), - AppendRow(Vec<i16>), - GetResult(Sender<Vec<u8>>), -} - -#[derive(Default)] -pub struct MpscWorker { - senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS], -} - -impl MpscWorker { - fn start_with( - &mut self, - row_data: RowData, - spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>, - ) -> Result<()> { - // if there is no worker thread for this component yet, start one - let component = row_data.index; - if let None = self.senders[component] { - let sender = spawn_worker(component)?; - self.senders[component] = Some(sender); - } - - // we do the "take out value and put it back in once we're done" dance here - // and in all other message-passing methods because there's not that many rows - // and this should be cheaper than spawning MAX_COMPONENTS many threads up front - let sender = self.senders[component].as_mut().unwrap(); - sender - .send(WorkerMsg::Start(row_data)) - .expect("jpeg-decoder worker thread error"); - Ok(()) - } - - fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { - let component = row.0; - let sender = self.senders[component].as_mut().unwrap(); - sender - .send(WorkerMsg::AppendRow(row.1)) - .expect("jpeg-decoder worker thread error"); - Ok(()) - } - - fn get_result_with( - &mut self, - index: usize, - collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>, - ) -> Result<Vec<u8>> { - let (tx, rx) = mpsc::channel(); - let sender = mem::take(&mut self.senders[index]).unwrap(); - sender - .send(WorkerMsg::GetResult(tx)) - .expect("jpeg-decoder worker thread error"); - Ok(collect(rx)) - } -} - -impl Worker for MpscWorker { - fn start(&mut self, row_data: RowData) -> Result<()> { - self.start_with(row_data, spawn_worker_thread) - } - fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { - MpscWorker::append_row(self, row) - } - fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { - self.get_result_with(index, collect_worker_thread) - } -} - -fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) { - let (tx, rx) = mpsc::channel(); - let closure = move || { - let mut worker = ImmediateWorker::default(); - - while let Ok(message) = rx.recv() { - match message { - WorkerMsg::Start(mut data) => { - // we always set component index to 0 for worker threads - // because they only ever handle one per thread and we don't want them - // to attempt to access nonexistent components - data.index = 0; - worker.start_immediate(data); - } - WorkerMsg::AppendRow(row) => { - worker.append_row_immediate((0, row)); - } - WorkerMsg::GetResult(chan) => { - let _ = chan.send(worker.get_result_immediate(0)); - break; - } - } - } - }; - - (tx, closure) -} - -fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> { - let (tx, worker) = create_worker(); - let thread_builder = - std::thread::Builder::new().name(format!("worker thread for component {}", component)); - thread_builder.spawn(worker)?; - Ok(tx) -} - -fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> { - rx.recv().expect("jpeg-decoder worker thread error") -} |