diff options
Diffstat (limited to 'vendor/jpeg-decoder/src/worker')
-rw-r--r-- | vendor/jpeg-decoder/src/worker/immediate.rs | 80 | ||||
-rw-r--r-- | vendor/jpeg-decoder/src/worker/mod.rs | 128 | ||||
-rw-r--r-- | vendor/jpeg-decoder/src/worker/multithreaded.rs | 123 | ||||
-rw-r--r-- | vendor/jpeg-decoder/src/worker/rayon.rs | 221 |
4 files changed, 552 insertions, 0 deletions
diff --git a/vendor/jpeg-decoder/src/worker/immediate.rs b/vendor/jpeg-decoder/src/worker/immediate.rs new file mode 100644 index 0000000..8c6e7db --- /dev/null +++ b/vendor/jpeg-decoder/src/worker/immediate.rs @@ -0,0 +1,80 @@ +use alloc::vec; +use alloc::vec::Vec; +use core::mem; +use core::convert::TryInto; +use crate::decoder::MAX_COMPONENTS; +use crate::error::Result; +use crate::idct::dequantize_and_idct_block; +use crate::alloc::sync::Arc; +use crate::parser::Component; +use super::{RowData, Worker}; + +pub struct ImmediateWorker { + offsets: [usize; MAX_COMPONENTS], + results: Vec<Vec<u8>>, + components: Vec<Option<Component>>, + quantization_tables: Vec<Option<Arc<[u16; 64]>>>, +} + +impl Default for ImmediateWorker { + fn default() -> Self { + ImmediateWorker { + offsets: [0; MAX_COMPONENTS], + results: vec![Vec::new(); MAX_COMPONENTS], + components: vec![None; MAX_COMPONENTS], + quantization_tables: vec![None; MAX_COMPONENTS], + } + } +} + +impl ImmediateWorker { + pub fn start_immediate(&mut self, data: RowData) { + assert!(self.results[data.index].is_empty()); + + self.offsets[data.index] = 0; + self.results[data.index].resize(data.component.block_size.width as usize * data.component.block_size.height as usize * data.component.dct_scale * data.component.dct_scale, 0u8); + self.components[data.index] = Some(data.component); + self.quantization_tables[data.index] = Some(data.quantization_table); + } + + pub fn append_row_immediate(&mut self, (index, data): (usize, Vec<i16>)) { + // Convert coefficients from a MCU row to samples. + + let component = self.components[index].as_ref().unwrap(); + let quantization_table = self.quantization_tables[index].as_ref().unwrap(); + let block_count = component.block_size.width as usize * component.vertical_sampling_factor as usize; + let line_stride = component.block_size.width as usize * component.dct_scale; + + assert_eq!(data.len(), block_count * 64); + + for i in 0..block_count { + let x = (i % component.block_size.width as usize) * component.dct_scale; + let y = (i / component.block_size.width as usize) * component.dct_scale; + + let coefficients = data[i * 64..(i + 1) * 64].try_into().unwrap(); + let output = &mut self.results[index][self.offsets[index] + y * line_stride + x..]; + + dequantize_and_idct_block(component.dct_scale, coefficients, quantization_table, line_stride, output); + } + + self.offsets[index] += block_count * component.dct_scale * component.dct_scale; + } + + pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> { + mem::take(&mut self.results[index]) + } +} + +impl Worker for ImmediateWorker { + fn start(&mut self, data: RowData) -> Result<()> { + self.start_immediate(data); + Ok(()) + } + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + self.append_row_immediate(row); + Ok(()) + } + fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { + Ok(self.get_result_immediate(index)) + } +} diff --git a/vendor/jpeg-decoder/src/worker/mod.rs b/vendor/jpeg-decoder/src/worker/mod.rs new file mode 100644 index 0000000..d6c2b10 --- /dev/null +++ b/vendor/jpeg-decoder/src/worker/mod.rs @@ -0,0 +1,128 @@ +mod immediate; +mod multithreaded; +#[cfg(all( + not(any(target_arch = "asmjs", target_arch = "wasm32")), + feature = "rayon" +))] +mod rayon; + +use crate::decoder::{choose_color_convert_func, ColorTransform}; +use crate::error::Result; +use crate::parser::{Component, Dimensions}; +use crate::upsampler::Upsampler; + +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::cell::RefCell; + +pub struct RowData { + pub index: usize, + pub component: Component, + pub quantization_table: Arc<[u16; 64]>, +} + +pub trait Worker { + fn start(&mut self, row_data: RowData) -> Result<()>; + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>; + fn get_result(&mut self, index: usize) -> Result<Vec<u8>>; + /// Default implementation for spawning multiple tasks. + fn append_rows(&mut self, row: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> { + for item in row { + self.append_row(item)?; + } + Ok(()) + } +} + +#[allow(dead_code)] +pub enum PreferWorkerKind { + Immediate, + Multithreaded, +} + +#[derive(Default)] +pub struct WorkerScope { + inner: core::cell::RefCell<Option<WorkerScopeInner>>, +} + +enum WorkerScopeInner { + #[cfg(all( + not(any(target_arch = "asmjs", target_arch = "wasm32")), + feature = "rayon" + ))] + Rayon(rayon::Scoped), + #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))] + Multithreaded(multithreaded::MpscWorker), + Immediate(immediate::ImmediateWorker), +} + +impl WorkerScope { + pub fn with<T>(with: impl FnOnce(&Self) -> T) -> T { + with(&WorkerScope { + inner: RefCell::default(), + }) + } + + pub fn get_or_init_worker<T>( + &self, + prefer: PreferWorkerKind, + f: impl FnOnce(&mut dyn Worker) -> T, + ) -> T { + let mut inner = self.inner.borrow_mut(); + let inner = inner.get_or_insert_with(move || match prefer { + #[cfg(all( + not(any(target_arch = "asmjs", target_arch = "wasm32")), + feature = "rayon" + ))] + PreferWorkerKind::Multithreaded => WorkerScopeInner::Rayon(Default::default()), + #[allow(unreachable_patterns)] + #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))] + PreferWorkerKind::Multithreaded => WorkerScopeInner::Multithreaded(Default::default()), + _ => WorkerScopeInner::Immediate(Default::default()), + }); + + f(match &mut *inner { + #[cfg(all( + not(any(target_arch = "asmjs", target_arch = "wasm32")), + feature = "rayon" + ))] + WorkerScopeInner::Rayon(worker) => worker, + #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))] + WorkerScopeInner::Multithreaded(worker) => worker, + WorkerScopeInner::Immediate(worker) => worker, + }) + } +} + +pub fn compute_image_parallel( + components: &[Component], + data: Vec<Vec<u8>>, + output_size: Dimensions, + color_transform: ColorTransform, +) -> Result<Vec<u8>> { + #[cfg(all( + not(any(target_arch = "asmjs", target_arch = "wasm32")), + feature = "rayon" + ))] + return rayon::compute_image_parallel(components, data, output_size, color_transform); + + #[allow(unreachable_code)] + { + let color_convert_func = choose_color_convert_func(components.len(), color_transform)?; + let upsampler = Upsampler::new(components, output_size.width, output_size.height)?; + let line_size = output_size.width as usize * components.len(); + let mut image = vec![0u8; line_size * output_size.height as usize]; + + for (row, line) in image.chunks_mut(line_size).enumerate() { + upsampler.upsample_and_interleave_row( + &data, + row, + output_size.width as usize, + line, + color_convert_func, + ); + } + + Ok(image) + } +} diff --git a/vendor/jpeg-decoder/src/worker/multithreaded.rs b/vendor/jpeg-decoder/src/worker/multithreaded.rs new file mode 100644 index 0000000..c820702 --- /dev/null +++ b/vendor/jpeg-decoder/src/worker/multithreaded.rs @@ -0,0 +1,123 @@ +//! This module implements per-component parallelism. +//! It should be possible to implement per-row parallelism as well, +//! which should also boost performance of grayscale images +//! and allow scaling to more cores. +//! However, that would be more complex, so we use this as a starting point. + +use super::immediate::ImmediateWorker; +use super::{RowData, Worker}; +use crate::decoder::MAX_COMPONENTS; +use crate::error::Result; +use std::{ + mem, + sync::mpsc::{self, Receiver, Sender}, +}; + +enum WorkerMsg { + Start(RowData), + AppendRow(Vec<i16>), + GetResult(Sender<Vec<u8>>), +} + +#[derive(Default)] +pub struct MpscWorker { + senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS], +} + +impl MpscWorker { + fn start_with( + &mut self, + row_data: RowData, + spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>, + ) -> Result<()> { + // if there is no worker thread for this component yet, start one + let component = row_data.index; + if let None = self.senders[component] { + let sender = spawn_worker(component)?; + self.senders[component] = Some(sender); + } + + // we do the "take out value and put it back in once we're done" dance here + // and in all other message-passing methods because there's not that many rows + // and this should be cheaper than spawning MAX_COMPONENTS many threads up front + let sender = self.senders[component].as_mut().unwrap(); + sender + .send(WorkerMsg::Start(row_data)) + .expect("jpeg-decoder worker thread error"); + Ok(()) + } + + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + let component = row.0; + let sender = self.senders[component].as_mut().unwrap(); + sender + .send(WorkerMsg::AppendRow(row.1)) + .expect("jpeg-decoder worker thread error"); + Ok(()) + } + + fn get_result_with( + &mut self, + index: usize, + collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>, + ) -> Result<Vec<u8>> { + let (tx, rx) = mpsc::channel(); + let sender = mem::take(&mut self.senders[index]).unwrap(); + sender + .send(WorkerMsg::GetResult(tx)) + .expect("jpeg-decoder worker thread error"); + Ok(collect(rx)) + } +} + +impl Worker for MpscWorker { + fn start(&mut self, row_data: RowData) -> Result<()> { + self.start_with(row_data, spawn_worker_thread) + } + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + MpscWorker::append_row(self, row) + } + fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { + self.get_result_with(index, collect_worker_thread) + } +} + +fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) { + let (tx, rx) = mpsc::channel(); + let closure = move || { + let mut worker = ImmediateWorker::default(); + + while let Ok(message) = rx.recv() { + match message { + WorkerMsg::Start(mut data) => { + // we always set component index to 0 for worker threads + // because they only ever handle one per thread and we don't want them + // to attempt to access nonexistent components + data.index = 0; + worker.start_immediate(data); + } + WorkerMsg::AppendRow(row) => { + worker.append_row_immediate((0, row)); + } + WorkerMsg::GetResult(chan) => { + let _ = chan.send(worker.get_result_immediate(0)); + break; + } + } + } + }; + + (tx, closure) +} + +fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> { + let (tx, worker) = create_worker(); + let thread_builder = + std::thread::Builder::new().name(format!("worker thread for component {}", component)); + thread_builder.spawn(worker)?; + Ok(tx) +} + +fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> { + rx.recv().expect("jpeg-decoder worker thread error") +} diff --git a/vendor/jpeg-decoder/src/worker/rayon.rs b/vendor/jpeg-decoder/src/worker/rayon.rs new file mode 100644 index 0000000..ec7df25 --- /dev/null +++ b/vendor/jpeg-decoder/src/worker/rayon.rs @@ -0,0 +1,221 @@ +use core::convert::TryInto; + +use rayon::iter::{IndexedParallelIterator, ParallelIterator}; +use rayon::slice::ParallelSliceMut; + +use crate::decoder::{choose_color_convert_func, ColorTransform}; +use crate::error::Result; +use crate::idct::dequantize_and_idct_block; +use crate::parser::Component; +use crate::upsampler::Upsampler; +use crate::{decoder::MAX_COMPONENTS, parser::Dimensions}; + +use std::sync::Arc; + +use super::{RowData, Worker}; + +/// Technically similar to `immediate::ImmediateWorker` but we copy it since we may prefer +/// different style of managing the memory allocation, something that multiple actors can access in +/// parallel. +#[derive(Default)] +struct ImmediateWorker { + offsets: [usize; MAX_COMPONENTS], + results: [Vec<u8>; MAX_COMPONENTS], + components: [Option<Component>; MAX_COMPONENTS], + quantization_tables: [Option<Arc<[u16; 64]>>; MAX_COMPONENTS], +} + +#[derive(Clone, Copy)] +struct ComponentMetadata { + block_width: usize, + block_count: usize, + line_stride: usize, + dct_scale: usize, +} + +#[derive(Default)] +pub struct Scoped { + inner: ImmediateWorker, +} + +impl ImmediateWorker { + pub fn start_immediate(&mut self, data: RowData) { + let elements = data.component.block_size.width as usize + * data.component.block_size.height as usize + * data.component.dct_scale + * data.component.dct_scale; + self.offsets[data.index] = 0; + self.results[data.index].resize(elements, 0u8); + self.components[data.index] = Some(data.component); + self.quantization_tables[data.index] = Some(data.quantization_table); + } + + pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> { + core::mem::take(&mut self.results[index]) + } + + pub fn component_metadata(&self, index: usize) -> Option<ComponentMetadata> { + let component = self.components[index].as_ref()?; + let block_size = component.block_size; + let block_width = block_size.width as usize; + let block_count = block_size.width as usize * component.vertical_sampling_factor as usize; + let line_stride = block_size.width as usize * component.dct_scale; + let dct_scale = component.dct_scale; + + Some(ComponentMetadata { + block_width, + block_count, + line_stride, + dct_scale, + }) + } + + pub fn append_row_locked( + quantization_table: Arc<[u16; 64]>, + metadata: ComponentMetadata, + data: Vec<i16>, + result_block: &mut [u8], + ) { + // Convert coefficients from a MCU row to samples. + let ComponentMetadata { + block_count, + line_stride, + block_width, + dct_scale, + } = metadata; + + assert_eq!(data.len(), block_count * 64); + + let mut output_buffer = [0; 64]; + for i in 0..block_count { + let x = (i % block_width) * dct_scale; + let y = (i / block_width) * dct_scale; + + let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap(); + + // Write to a temporary intermediate buffer, a 8x8 'image'. + dequantize_and_idct_block( + dct_scale, + coefficients, + &*quantization_table, + 8, + &mut output_buffer, + ); + + let write_back = &mut result_block[y * line_stride + x..]; + + let buffered_lines = output_buffer.chunks_mut(8); + let back_lines = write_back.chunks_mut(line_stride); + + for (buf, back) in buffered_lines.zip(back_lines).take(dct_scale) { + back[..dct_scale].copy_from_slice(&buf[..dct_scale]); + } + } + } +} + +impl Worker for Scoped { + fn start(&mut self, row_data: RowData) -> Result<()> { + self.inner.start_immediate(row_data); + Ok(()) + } + + fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { + let inner = &mut self.inner; + let (index, data) = row; + + let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); + let metadata = inner.component_metadata(index).unwrap(); + let result_block = &mut inner.results[index][inner.offsets[index]..]; + inner.offsets[index] += metadata.bytes_used(); + + ImmediateWorker::append_row_locked(quantization_table, metadata, data, result_block); + Ok(()) + } + + fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { + let result = self.inner.get_result_immediate(index); + Ok(result) + } + + // Magic sauce, these _may_ run in parallel. + fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> { + let inner = &mut self.inner; + rayon::in_place_scope(|scope| { + let metadatas = [ + inner.component_metadata(0), + inner.component_metadata(1), + inner.component_metadata(2), + inner.component_metadata(3), + ]; + + let [res0, res1, res2, res3] = &mut inner.results; + + // Lazily get the blocks. Note: if we've already collected results from a component + // then the result vector has already been deallocated/taken. But no more tasks should + // be created for it. + let mut result_blocks = [ + res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), + res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), + res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), + res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), + ]; + + // First we schedule everything, making sure their index is right etc. + for (index, data) in iter { + let metadata = metadatas[index].unwrap(); + let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); + + inner.offsets[index] += metadata.bytes_used(); + let (result_block, tail) = + core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); + result_blocks[index] = tail; + + scope.spawn(move |_| { + ImmediateWorker::append_row_locked( + quantization_table, + metadata, + data, + result_block, + ) + }); + } + }); + + Ok(()) + } +} + +impl ComponentMetadata { + fn bytes_used(&self) -> usize { + self.block_count * self.dct_scale * self.dct_scale + } +} + +pub fn compute_image_parallel( + components: &[Component], + data: Vec<Vec<u8>>, + output_size: Dimensions, + color_transform: ColorTransform, +) -> Result<Vec<u8>> { + let color_convert_func = choose_color_convert_func(components.len(), color_transform)?; + let upsampler = Upsampler::new(components, output_size.width, output_size.height)?; + let line_size = output_size.width as usize * components.len(); + let mut image = vec![0u8; line_size * output_size.height as usize]; + + image + .par_chunks_mut(line_size) + .with_max_len(1) + .enumerate() + .for_each(|(row, line)| { + upsampler.upsample_and_interleave_row( + &data, + row, + output_size.width as usize, + line, + color_convert_func, + ); + }); + + Ok(image) +} |