//! This module implements per-component parallelism. //! It should be possible to implement per-row parallelism as well, //! which should also boost performance of grayscale images //! and allow scaling to more cores. //! However, that would be more complex, so we use this as a starting point. use super::immediate::ImmediateWorker; use super::{RowData, Worker}; use crate::decoder::MAX_COMPONENTS; use crate::error::Result; use std::{ mem, sync::mpsc::{self, Receiver, Sender}, }; enum WorkerMsg { Start(RowData), AppendRow(Vec), GetResult(Sender>), } #[derive(Default)] pub struct MpscWorker { senders: [Option>; MAX_COMPONENTS], } impl MpscWorker { fn start_with( &mut self, row_data: RowData, spawn_worker: impl FnOnce(usize) -> Result>, ) -> Result<()> { // if there is no worker thread for this component yet, start one let component = row_data.index; if let None = self.senders[component] { let sender = spawn_worker(component)?; self.senders[component] = Some(sender); } // we do the "take out value and put it back in once we're done" dance here // and in all other message-passing methods because there's not that many rows // and this should be cheaper than spawning MAX_COMPONENTS many threads up front let sender = self.senders[component].as_mut().unwrap(); sender .send(WorkerMsg::Start(row_data)) .expect("jpeg-decoder worker thread error"); Ok(()) } fn append_row(&mut self, row: (usize, Vec)) -> Result<()> { let component = row.0; let sender = self.senders[component].as_mut().unwrap(); sender .send(WorkerMsg::AppendRow(row.1)) .expect("jpeg-decoder worker thread error"); Ok(()) } fn get_result_with( &mut self, index: usize, collect: impl FnOnce(Receiver>) -> Vec, ) -> Result> { let (tx, rx) = mpsc::channel(); let sender = mem::take(&mut self.senders[index]).unwrap(); sender .send(WorkerMsg::GetResult(tx)) .expect("jpeg-decoder worker thread error"); Ok(collect(rx)) } } impl Worker for MpscWorker { fn start(&mut self, row_data: RowData) -> Result<()> { self.start_with(row_data, spawn_worker_thread) } fn append_row(&mut self, row: (usize, Vec)) -> Result<()> { MpscWorker::append_row(self, row) } fn get_result(&mut self, index: usize) -> Result> { self.get_result_with(index, collect_worker_thread) } } fn create_worker() -> (Sender, impl FnOnce() + 'static) { let (tx, rx) = mpsc::channel(); let closure = move || { let mut worker = ImmediateWorker::default(); while let Ok(message) = rx.recv() { match message { WorkerMsg::Start(mut data) => { // we always set component index to 0 for worker threads // because they only ever handle one per thread and we don't want them // to attempt to access nonexistent components data.index = 0; worker.start_immediate(data); } WorkerMsg::AppendRow(row) => { worker.append_row_immediate((0, row)); } WorkerMsg::GetResult(chan) => { let _ = chan.send(worker.get_result_immediate(0)); break; } } } }; (tx, closure) } fn spawn_worker_thread(component: usize) -> Result> { let (tx, worker) = create_worker(); let thread_builder = std::thread::Builder::new().name(format!("worker thread for component {}", component)); thread_builder.spawn(worker)?; Ok(tx) } fn collect_worker_thread(rx: Receiver>) -> Vec { rx.recv().expect("jpeg-decoder worker thread error") }