summaryrefslogtreecommitdiff
path: root/vendor/jpeg-decoder/src/worker
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/jpeg-decoder/src/worker
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/jpeg-decoder/src/worker')
-rw-r--r--vendor/jpeg-decoder/src/worker/immediate.rs80
-rw-r--r--vendor/jpeg-decoder/src/worker/mod.rs128
-rw-r--r--vendor/jpeg-decoder/src/worker/multithreaded.rs123
-rw-r--r--vendor/jpeg-decoder/src/worker/rayon.rs221
4 files changed, 552 insertions, 0 deletions
diff --git a/vendor/jpeg-decoder/src/worker/immediate.rs b/vendor/jpeg-decoder/src/worker/immediate.rs
new file mode 100644
index 0000000..8c6e7db
--- /dev/null
+++ b/vendor/jpeg-decoder/src/worker/immediate.rs
@@ -0,0 +1,80 @@
+use alloc::vec;
+use alloc::vec::Vec;
+use core::mem;
+use core::convert::TryInto;
+use crate::decoder::MAX_COMPONENTS;
+use crate::error::Result;
+use crate::idct::dequantize_and_idct_block;
+use crate::alloc::sync::Arc;
+use crate::parser::Component;
+use super::{RowData, Worker};
+
+pub struct ImmediateWorker {
+ offsets: [usize; MAX_COMPONENTS],
+ results: Vec<Vec<u8>>,
+ components: Vec<Option<Component>>,
+ quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
+}
+
+impl Default for ImmediateWorker {
+ fn default() -> Self {
+ ImmediateWorker {
+ offsets: [0; MAX_COMPONENTS],
+ results: vec![Vec::new(); MAX_COMPONENTS],
+ components: vec![None; MAX_COMPONENTS],
+ quantization_tables: vec![None; MAX_COMPONENTS],
+ }
+ }
+}
+
+impl ImmediateWorker {
+ pub fn start_immediate(&mut self, data: RowData) {
+ assert!(self.results[data.index].is_empty());
+
+ self.offsets[data.index] = 0;
+ self.results[data.index].resize(data.component.block_size.width as usize * data.component.block_size.height as usize * data.component.dct_scale * data.component.dct_scale, 0u8);
+ self.components[data.index] = Some(data.component);
+ self.quantization_tables[data.index] = Some(data.quantization_table);
+ }
+
+ pub fn append_row_immediate(&mut self, (index, data): (usize, Vec<i16>)) {
+ // Convert coefficients from a MCU row to samples.
+
+ let component = self.components[index].as_ref().unwrap();
+ let quantization_table = self.quantization_tables[index].as_ref().unwrap();
+ let block_count = component.block_size.width as usize * component.vertical_sampling_factor as usize;
+ let line_stride = component.block_size.width as usize * component.dct_scale;
+
+ assert_eq!(data.len(), block_count * 64);
+
+ for i in 0..block_count {
+ let x = (i % component.block_size.width as usize) * component.dct_scale;
+ let y = (i / component.block_size.width as usize) * component.dct_scale;
+
+ let coefficients = data[i * 64..(i + 1) * 64].try_into().unwrap();
+ let output = &mut self.results[index][self.offsets[index] + y * line_stride + x..];
+
+ dequantize_and_idct_block(component.dct_scale, coefficients, quantization_table, line_stride, output);
+ }
+
+ self.offsets[index] += block_count * component.dct_scale * component.dct_scale;
+ }
+
+ pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
+ mem::take(&mut self.results[index])
+ }
+}
+
+impl Worker for ImmediateWorker {
+ fn start(&mut self, data: RowData) -> Result<()> {
+ self.start_immediate(data);
+ Ok(())
+ }
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ self.append_row_immediate(row);
+ Ok(())
+ }
+ fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
+ Ok(self.get_result_immediate(index))
+ }
+}
diff --git a/vendor/jpeg-decoder/src/worker/mod.rs b/vendor/jpeg-decoder/src/worker/mod.rs
new file mode 100644
index 0000000..d6c2b10
--- /dev/null
+++ b/vendor/jpeg-decoder/src/worker/mod.rs
@@ -0,0 +1,128 @@
+mod immediate;
+mod multithreaded;
+#[cfg(all(
+ not(any(target_arch = "asmjs", target_arch = "wasm32")),
+ feature = "rayon"
+))]
+mod rayon;
+
+use crate::decoder::{choose_color_convert_func, ColorTransform};
+use crate::error::Result;
+use crate::parser::{Component, Dimensions};
+use crate::upsampler::Upsampler;
+
+use alloc::sync::Arc;
+use alloc::vec::Vec;
+use core::cell::RefCell;
+
+pub struct RowData {
+ pub index: usize,
+ pub component: Component,
+ pub quantization_table: Arc<[u16; 64]>,
+}
+
+pub trait Worker {
+ fn start(&mut self, row_data: RowData) -> Result<()>;
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
+ fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
+ /// Default implementation for spawning multiple tasks.
+ fn append_rows(&mut self, row: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
+ for item in row {
+ self.append_row(item)?;
+ }
+ Ok(())
+ }
+}
+
+#[allow(dead_code)]
+pub enum PreferWorkerKind {
+ Immediate,
+ Multithreaded,
+}
+
+#[derive(Default)]
+pub struct WorkerScope {
+ inner: core::cell::RefCell<Option<WorkerScopeInner>>,
+}
+
+enum WorkerScopeInner {
+ #[cfg(all(
+ not(any(target_arch = "asmjs", target_arch = "wasm32")),
+ feature = "rayon"
+ ))]
+ Rayon(rayon::Scoped),
+ #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
+ Multithreaded(multithreaded::MpscWorker),
+ Immediate(immediate::ImmediateWorker),
+}
+
+impl WorkerScope {
+ pub fn with<T>(with: impl FnOnce(&Self) -> T) -> T {
+ with(&WorkerScope {
+ inner: RefCell::default(),
+ })
+ }
+
+ pub fn get_or_init_worker<T>(
+ &self,
+ prefer: PreferWorkerKind,
+ f: impl FnOnce(&mut dyn Worker) -> T,
+ ) -> T {
+ let mut inner = self.inner.borrow_mut();
+ let inner = inner.get_or_insert_with(move || match prefer {
+ #[cfg(all(
+ not(any(target_arch = "asmjs", target_arch = "wasm32")),
+ feature = "rayon"
+ ))]
+ PreferWorkerKind::Multithreaded => WorkerScopeInner::Rayon(Default::default()),
+ #[allow(unreachable_patterns)]
+ #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
+ PreferWorkerKind::Multithreaded => WorkerScopeInner::Multithreaded(Default::default()),
+ _ => WorkerScopeInner::Immediate(Default::default()),
+ });
+
+ f(match &mut *inner {
+ #[cfg(all(
+ not(any(target_arch = "asmjs", target_arch = "wasm32")),
+ feature = "rayon"
+ ))]
+ WorkerScopeInner::Rayon(worker) => worker,
+ #[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
+ WorkerScopeInner::Multithreaded(worker) => worker,
+ WorkerScopeInner::Immediate(worker) => worker,
+ })
+ }
+}
+
+pub fn compute_image_parallel(
+ components: &[Component],
+ data: Vec<Vec<u8>>,
+ output_size: Dimensions,
+ color_transform: ColorTransform,
+) -> Result<Vec<u8>> {
+ #[cfg(all(
+ not(any(target_arch = "asmjs", target_arch = "wasm32")),
+ feature = "rayon"
+ ))]
+ return rayon::compute_image_parallel(components, data, output_size, color_transform);
+
+ #[allow(unreachable_code)]
+ {
+ let color_convert_func = choose_color_convert_func(components.len(), color_transform)?;
+ let upsampler = Upsampler::new(components, output_size.width, output_size.height)?;
+ let line_size = output_size.width as usize * components.len();
+ let mut image = vec![0u8; line_size * output_size.height as usize];
+
+ for (row, line) in image.chunks_mut(line_size).enumerate() {
+ upsampler.upsample_and_interleave_row(
+ &data,
+ row,
+ output_size.width as usize,
+ line,
+ color_convert_func,
+ );
+ }
+
+ Ok(image)
+ }
+}
diff --git a/vendor/jpeg-decoder/src/worker/multithreaded.rs b/vendor/jpeg-decoder/src/worker/multithreaded.rs
new file mode 100644
index 0000000..c820702
--- /dev/null
+++ b/vendor/jpeg-decoder/src/worker/multithreaded.rs
@@ -0,0 +1,123 @@
+//! This module implements per-component parallelism.
+//! It should be possible to implement per-row parallelism as well,
+//! which should also boost performance of grayscale images
+//! and allow scaling to more cores.
+//! However, that would be more complex, so we use this as a starting point.
+
+use super::immediate::ImmediateWorker;
+use super::{RowData, Worker};
+use crate::decoder::MAX_COMPONENTS;
+use crate::error::Result;
+use std::{
+ mem,
+ sync::mpsc::{self, Receiver, Sender},
+};
+
+enum WorkerMsg {
+ Start(RowData),
+ AppendRow(Vec<i16>),
+ GetResult(Sender<Vec<u8>>),
+}
+
+#[derive(Default)]
+pub struct MpscWorker {
+ senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS],
+}
+
+impl MpscWorker {
+ fn start_with(
+ &mut self,
+ row_data: RowData,
+ spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>,
+ ) -> Result<()> {
+ // if there is no worker thread for this component yet, start one
+ let component = row_data.index;
+ if let None = self.senders[component] {
+ let sender = spawn_worker(component)?;
+ self.senders[component] = Some(sender);
+ }
+
+ // we do the "take out value and put it back in once we're done" dance here
+ // and in all other message-passing methods because there's not that many rows
+ // and this should be cheaper than spawning MAX_COMPONENTS many threads up front
+ let sender = self.senders[component].as_mut().unwrap();
+ sender
+ .send(WorkerMsg::Start(row_data))
+ .expect("jpeg-decoder worker thread error");
+ Ok(())
+ }
+
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ let component = row.0;
+ let sender = self.senders[component].as_mut().unwrap();
+ sender
+ .send(WorkerMsg::AppendRow(row.1))
+ .expect("jpeg-decoder worker thread error");
+ Ok(())
+ }
+
+ fn get_result_with(
+ &mut self,
+ index: usize,
+ collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>,
+ ) -> Result<Vec<u8>> {
+ let (tx, rx) = mpsc::channel();
+ let sender = mem::take(&mut self.senders[index]).unwrap();
+ sender
+ .send(WorkerMsg::GetResult(tx))
+ .expect("jpeg-decoder worker thread error");
+ Ok(collect(rx))
+ }
+}
+
+impl Worker for MpscWorker {
+ fn start(&mut self, row_data: RowData) -> Result<()> {
+ self.start_with(row_data, spawn_worker_thread)
+ }
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ MpscWorker::append_row(self, row)
+ }
+ fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
+ self.get_result_with(index, collect_worker_thread)
+ }
+}
+
+fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
+ let (tx, rx) = mpsc::channel();
+ let closure = move || {
+ let mut worker = ImmediateWorker::default();
+
+ while let Ok(message) = rx.recv() {
+ match message {
+ WorkerMsg::Start(mut data) => {
+ // we always set component index to 0 for worker threads
+ // because they only ever handle one per thread and we don't want them
+ // to attempt to access nonexistent components
+ data.index = 0;
+ worker.start_immediate(data);
+ }
+ WorkerMsg::AppendRow(row) => {
+ worker.append_row_immediate((0, row));
+ }
+ WorkerMsg::GetResult(chan) => {
+ let _ = chan.send(worker.get_result_immediate(0));
+ break;
+ }
+ }
+ }
+ };
+
+ (tx, closure)
+}
+
+fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
+ let (tx, worker) = create_worker();
+ let thread_builder =
+ std::thread::Builder::new().name(format!("worker thread for component {}", component));
+ thread_builder.spawn(worker)?;
+ Ok(tx)
+}
+
+fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> {
+ rx.recv().expect("jpeg-decoder worker thread error")
+}
diff --git a/vendor/jpeg-decoder/src/worker/rayon.rs b/vendor/jpeg-decoder/src/worker/rayon.rs
new file mode 100644
index 0000000..ec7df25
--- /dev/null
+++ b/vendor/jpeg-decoder/src/worker/rayon.rs
@@ -0,0 +1,221 @@
+use core::convert::TryInto;
+
+use rayon::iter::{IndexedParallelIterator, ParallelIterator};
+use rayon::slice::ParallelSliceMut;
+
+use crate::decoder::{choose_color_convert_func, ColorTransform};
+use crate::error::Result;
+use crate::idct::dequantize_and_idct_block;
+use crate::parser::Component;
+use crate::upsampler::Upsampler;
+use crate::{decoder::MAX_COMPONENTS, parser::Dimensions};
+
+use std::sync::Arc;
+
+use super::{RowData, Worker};
+
+/// Technically similar to `immediate::ImmediateWorker` but we copy it since we may prefer
+/// different style of managing the memory allocation, something that multiple actors can access in
+/// parallel.
+#[derive(Default)]
+struct ImmediateWorker {
+ offsets: [usize; MAX_COMPONENTS],
+ results: [Vec<u8>; MAX_COMPONENTS],
+ components: [Option<Component>; MAX_COMPONENTS],
+ quantization_tables: [Option<Arc<[u16; 64]>>; MAX_COMPONENTS],
+}
+
+#[derive(Clone, Copy)]
+struct ComponentMetadata {
+ block_width: usize,
+ block_count: usize,
+ line_stride: usize,
+ dct_scale: usize,
+}
+
+#[derive(Default)]
+pub struct Scoped {
+ inner: ImmediateWorker,
+}
+
+impl ImmediateWorker {
+ pub fn start_immediate(&mut self, data: RowData) {
+ let elements = data.component.block_size.width as usize
+ * data.component.block_size.height as usize
+ * data.component.dct_scale
+ * data.component.dct_scale;
+ self.offsets[data.index] = 0;
+ self.results[data.index].resize(elements, 0u8);
+ self.components[data.index] = Some(data.component);
+ self.quantization_tables[data.index] = Some(data.quantization_table);
+ }
+
+ pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
+ core::mem::take(&mut self.results[index])
+ }
+
+ pub fn component_metadata(&self, index: usize) -> Option<ComponentMetadata> {
+ let component = self.components[index].as_ref()?;
+ let block_size = component.block_size;
+ let block_width = block_size.width as usize;
+ let block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
+ let line_stride = block_size.width as usize * component.dct_scale;
+ let dct_scale = component.dct_scale;
+
+ Some(ComponentMetadata {
+ block_width,
+ block_count,
+ line_stride,
+ dct_scale,
+ })
+ }
+
+ pub fn append_row_locked(
+ quantization_table: Arc<[u16; 64]>,
+ metadata: ComponentMetadata,
+ data: Vec<i16>,
+ result_block: &mut [u8],
+ ) {
+ // Convert coefficients from a MCU row to samples.
+ let ComponentMetadata {
+ block_count,
+ line_stride,
+ block_width,
+ dct_scale,
+ } = metadata;
+
+ assert_eq!(data.len(), block_count * 64);
+
+ let mut output_buffer = [0; 64];
+ for i in 0..block_count {
+ let x = (i % block_width) * dct_scale;
+ let y = (i / block_width) * dct_scale;
+
+ let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap();
+
+ // Write to a temporary intermediate buffer, a 8x8 'image'.
+ dequantize_and_idct_block(
+ dct_scale,
+ coefficients,
+ &*quantization_table,
+ 8,
+ &mut output_buffer,
+ );
+
+ let write_back = &mut result_block[y * line_stride + x..];
+
+ let buffered_lines = output_buffer.chunks_mut(8);
+ let back_lines = write_back.chunks_mut(line_stride);
+
+ for (buf, back) in buffered_lines.zip(back_lines).take(dct_scale) {
+ back[..dct_scale].copy_from_slice(&buf[..dct_scale]);
+ }
+ }
+ }
+}
+
+impl Worker for Scoped {
+ fn start(&mut self, row_data: RowData) -> Result<()> {
+ self.inner.start_immediate(row_data);
+ Ok(())
+ }
+
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ let inner = &mut self.inner;
+ let (index, data) = row;
+
+ let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
+ let metadata = inner.component_metadata(index).unwrap();
+ let result_block = &mut inner.results[index][inner.offsets[index]..];
+ inner.offsets[index] += metadata.bytes_used();
+
+ ImmediateWorker::append_row_locked(quantization_table, metadata, data, result_block);
+ Ok(())
+ }
+
+ fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
+ let result = self.inner.get_result_immediate(index);
+ Ok(result)
+ }
+
+ // Magic sauce, these _may_ run in parallel.
+ fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
+ let inner = &mut self.inner;
+ rayon::in_place_scope(|scope| {
+ let metadatas = [
+ inner.component_metadata(0),
+ inner.component_metadata(1),
+ inner.component_metadata(2),
+ inner.component_metadata(3),
+ ];
+
+ let [res0, res1, res2, res3] = &mut inner.results;
+
+ // Lazily get the blocks. Note: if we've already collected results from a component
+ // then the result vector has already been deallocated/taken. But no more tasks should
+ // be created for it.
+ let mut result_blocks = [
+ res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []),
+ res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []),
+ res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []),
+ res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []),
+ ];
+
+ // First we schedule everything, making sure their index is right etc.
+ for (index, data) in iter {
+ let metadata = metadatas[index].unwrap();
+ let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
+
+ inner.offsets[index] += metadata.bytes_used();
+ let (result_block, tail) =
+ core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used());
+ result_blocks[index] = tail;
+
+ scope.spawn(move |_| {
+ ImmediateWorker::append_row_locked(
+ quantization_table,
+ metadata,
+ data,
+ result_block,
+ )
+ });
+ }
+ });
+
+ Ok(())
+ }
+}
+
+impl ComponentMetadata {
+ fn bytes_used(&self) -> usize {
+ self.block_count * self.dct_scale * self.dct_scale
+ }
+}
+
+pub fn compute_image_parallel(
+ components: &[Component],
+ data: Vec<Vec<u8>>,
+ output_size: Dimensions,
+ color_transform: ColorTransform,
+) -> Result<Vec<u8>> {
+ let color_convert_func = choose_color_convert_func(components.len(), color_transform)?;
+ let upsampler = Upsampler::new(components, output_size.width, output_size.height)?;
+ let line_size = output_size.width as usize * components.len();
+ let mut image = vec![0u8; line_size * output_size.height as usize];
+
+ image
+ .par_chunks_mut(line_size)
+ .with_max_len(1)
+ .enumerate()
+ .for_each(|(row, line)| {
+ upsampler.upsample_and_interleave_row(
+ &data,
+ row,
+ output_size.width as usize,
+ line,
+ color_convert_func,
+ );
+ });
+
+ Ok(image)
+}