aboutsummaryrefslogtreecommitdiff
path: root/vendor/jpeg-decoder/src/worker/multithreaded.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/jpeg-decoder/src/worker/multithreaded.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/jpeg-decoder/src/worker/multithreaded.rs')
-rw-r--r--vendor/jpeg-decoder/src/worker/multithreaded.rs123
1 files changed, 123 insertions, 0 deletions
diff --git a/vendor/jpeg-decoder/src/worker/multithreaded.rs b/vendor/jpeg-decoder/src/worker/multithreaded.rs
new file mode 100644
index 0000000..c820702
--- /dev/null
+++ b/vendor/jpeg-decoder/src/worker/multithreaded.rs
@@ -0,0 +1,123 @@
+//! This module implements per-component parallelism.
+//! It should be possible to implement per-row parallelism as well,
+//! which should also boost performance of grayscale images
+//! and allow scaling to more cores.
+//! However, that would be more complex, so we use this as a starting point.
+
+use super::immediate::ImmediateWorker;
+use super::{RowData, Worker};
+use crate::decoder::MAX_COMPONENTS;
+use crate::error::Result;
+use std::{
+ mem,
+ sync::mpsc::{self, Receiver, Sender},
+};
+
+enum WorkerMsg {
+ Start(RowData),
+ AppendRow(Vec<i16>),
+ GetResult(Sender<Vec<u8>>),
+}
+
+#[derive(Default)]
+pub struct MpscWorker {
+ senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS],
+}
+
+impl MpscWorker {
+ fn start_with(
+ &mut self,
+ row_data: RowData,
+ spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>,
+ ) -> Result<()> {
+ // if there is no worker thread for this component yet, start one
+ let component = row_data.index;
+ if let None = self.senders[component] {
+ let sender = spawn_worker(component)?;
+ self.senders[component] = Some(sender);
+ }
+
+ // we do the "take out value and put it back in once we're done" dance here
+ // and in all other message-passing methods because there's not that many rows
+ // and this should be cheaper than spawning MAX_COMPONENTS many threads up front
+ let sender = self.senders[component].as_mut().unwrap();
+ sender
+ .send(WorkerMsg::Start(row_data))
+ .expect("jpeg-decoder worker thread error");
+ Ok(())
+ }
+
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ let component = row.0;
+ let sender = self.senders[component].as_mut().unwrap();
+ sender
+ .send(WorkerMsg::AppendRow(row.1))
+ .expect("jpeg-decoder worker thread error");
+ Ok(())
+ }
+
+ fn get_result_with(
+ &mut self,
+ index: usize,
+ collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>,
+ ) -> Result<Vec<u8>> {
+ let (tx, rx) = mpsc::channel();
+ let sender = mem::take(&mut self.senders[index]).unwrap();
+ sender
+ .send(WorkerMsg::GetResult(tx))
+ .expect("jpeg-decoder worker thread error");
+ Ok(collect(rx))
+ }
+}
+
+impl Worker for MpscWorker {
+ fn start(&mut self, row_data: RowData) -> Result<()> {
+ self.start_with(row_data, spawn_worker_thread)
+ }
+ fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
+ MpscWorker::append_row(self, row)
+ }
+ fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
+ self.get_result_with(index, collect_worker_thread)
+ }
+}
+
+fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
+ let (tx, rx) = mpsc::channel();
+ let closure = move || {
+ let mut worker = ImmediateWorker::default();
+
+ while let Ok(message) = rx.recv() {
+ match message {
+ WorkerMsg::Start(mut data) => {
+ // we always set component index to 0 for worker threads
+ // because they only ever handle one per thread and we don't want them
+ // to attempt to access nonexistent components
+ data.index = 0;
+ worker.start_immediate(data);
+ }
+ WorkerMsg::AppendRow(row) => {
+ worker.append_row_immediate((0, row));
+ }
+ WorkerMsg::GetResult(chan) => {
+ let _ = chan.send(worker.get_result_immediate(0));
+ break;
+ }
+ }
+ }
+ };
+
+ (tx, closure)
+}
+
+fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
+ let (tx, worker) = create_worker();
+ let thread_builder =
+ std::thread::Builder::new().name(format!("worker thread for component {}", component));
+ thread_builder.spawn(worker)?;
+ Ok(tx)
+}
+
+fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> {
+ rx.recv().expect("jpeg-decoder worker thread error")
+}