diff options
Diffstat (limited to 'vendor/exr/src/block/reader.rs')
-rw-r--r-- | vendor/exr/src/block/reader.rs | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/vendor/exr/src/block/reader.rs b/vendor/exr/src/block/reader.rs new file mode 100644 index 0000000..bb9888e --- /dev/null +++ b/vendor/exr/src/block/reader.rs @@ -0,0 +1,527 @@ +//! Composable structures to handle reading an image. + + +use std::convert::TryFrom; +use std::fmt::Debug; +use std::io::{Read, Seek}; +use rayon_core::{ThreadPool, ThreadPoolBuildError}; + +use smallvec::alloc::sync::Arc; + +use crate::block::{BlockIndex, UncompressedBlock}; +use crate::block::chunk::{Chunk, TileCoordinates}; +use crate::compression::Compression; +use crate::error::{Error, Result, u64_to_usize, UnitResult}; +use crate::io::{PeekRead, Tracking}; +use crate::meta::{MetaData, OffsetTables}; +use crate::meta::header::Header; + +/// Decode the meta data from a byte source, keeping the source ready for further reading. +/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. +#[derive(Debug)] +pub struct Reader<R> { + meta_data: MetaData, + remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? +} + +impl<R: Read + Seek> Reader<R> { + + /// Start the reading process. + /// Immediately decodes the meta data into an internal field. + /// Access it via`meta_data()`. + pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { + let mut remaining_reader = PeekRead::new(Tracking::new(read)); + let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; + Ok(Self { meta_data, remaining_reader }) + } + + // must not be mutable, as reading the file later on relies on the meta data + /// The decoded exr meta data from the file. + pub fn meta_data(&self) -> &MetaData { &self.meta_data } + + /// The decoded exr meta data from the file. + pub fn headers(&self) -> &[Header] { &self.meta_data.headers } + + /// Obtain the meta data ownership. + pub fn into_meta_data(self) -> MetaData { self.meta_data } + + /// Prepare to read all the chunks from the file. + /// Does not decode the chunks now, but returns a decoder. + /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. + pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { + let total_chunk_count = { + if pedantic { + let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; + validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; + offset_tables.iter().map(|table| table.len()).sum() + } + else { + usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) + .expect("too large chunk count for this machine") + } + }; + + Ok(AllChunksReader { + meta_data: self.meta_data, + remaining_chunks: 0 .. total_chunk_count, + remaining_bytes: self.remaining_reader, + pedantic + }) + } + + /// Prepare to read some the chunks from the file. + /// Does not decode the chunks now, but returns a decoder. + /// Reading only some chunks may seeking the file, potentially skipping many bytes. + // TODO tile indices add no new information to block index?? + pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { + let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; + + // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? + if pedantic { + validate_offset_tables( + self.meta_data.headers.as_slice(), &offset_tables, + self.remaining_reader.byte_position() + )?; + } + + let mut filtered_offsets = Vec::with_capacity( + (self.meta_data.headers.len() * 32).min(2*2048) + ); + + // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied + + for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers + for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order + let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; + + let block = BlockIndex { + layer: header_index, + level: tile.location.level_index, + pixel_position: data_indices.position.to_usize("data indices start")?, + pixel_size: data_indices.size, + }; + + if filter(&self.meta_data, tile.location, block) { + filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` + } + }; + } + + filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) + + if pedantic { + // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. + if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { + return Err(Error::invalid("chunk offset table")) + } + } + + Ok(FilteredChunksReader { + meta_data: self.meta_data, + expected_filtered_chunk_count: filtered_offsets.len(), + remaining_filtered_chunk_indices: filtered_offsets.into_iter(), + remaining_bytes: self.remaining_reader + }) + } +} + + +fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { + let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max + .map(|header| header.max_pixel_file_bytes()) + .sum(); + + // check that each offset is within the bounds + let end_byte = chunks_start_byte + max_pixel_bytes; + let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) + .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); + + if is_invalid { Err(Error::invalid("offset table")) } + else { Ok(()) } +} + + + + +/// Decode the desired chunks and skip the unimportant chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct FilteredChunksReader<R> { + meta_data: MetaData, + expected_filtered_chunk_count: usize, + remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, + remaining_bytes: PeekRead<Tracking<R>>, +} + +/// Decode all chunks in the file without seeking. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct AllChunksReader<R> { + meta_data: MetaData, + remaining_chunks: std::ops::Range<usize>, + remaining_bytes: PeekRead<Tracking<R>>, + pedantic: bool, +} + +/// Decode chunks in the file without seeking. +/// Calls the supplied closure for each chunk. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct OnProgressChunksReader<R, F> { + chunks_reader: R, + decoded_chunks: usize, + callback: F, +} + +/// Decode chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { + + /// The decoded exr meta data from the file. + fn meta_data(&self) -> &MetaData; + + /// The decoded exr headers from the file. + fn headers(&self) -> &[Header] { &self.meta_data().headers } + + /// The number of chunks that this reader will return in total. + /// Can be less than the total number of chunks in the file, if some chunks are skipped. + fn expected_chunk_count(&self) -> usize; + + /// Read the next compressed chunk from the file. + /// Equivalent to `.next()`, as this also is an iterator. + /// Returns `None` if all chunks have been read. + fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } + + /// Create a new reader that calls the provided progress + /// callback for each chunk that is read from the file. + /// If the file can be successfully decoded, + /// the progress will always at least once include 0.0 at the start and 1.0 at the end. + fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { + OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } + } + + /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. + /// The order of the blocks is not deterministic. + /// You can also use `parallel_decompressor` to obtain an iterator instead. + /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. + // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) + fn decompress_parallel( + self, pedantic: bool, + mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult + ) -> UnitResult + { + let mut decompressor = match self.parallel_decompressor(pedantic) { + Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), + Ok(decompressor) => decompressor, + }; + + while let Some(block) = decompressor.next() { + insert_block(decompressor.meta_data(), block?)?; + } + + debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); + Ok(()) + } + + /// Return an iterator that decompresses the chunks with multiple threads. + /// The order of the blocks is not deterministic. + /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. + /// By default, this uses as many threads as there are CPUs. + /// Returns the `self` if there is no need for parallel decompression. + fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { + ParallelBlockDecompressor::new(self, pedantic) + } + + /// Return an iterator that decompresses the chunks in this thread. + /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. + fn decompress_sequential( + self, pedantic: bool, + mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult + ) -> UnitResult + { + let mut decompressor = self.sequential_decompressor(pedantic); + while let Some(block) = decompressor.next() { + insert_block(decompressor.meta_data(), block?)?; + } + + debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); + Ok(()) + } + + /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. + fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { + SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } + } +} + +impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { + fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } + fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } +} + +impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} +impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + self.chunks_reader.next().map(|item|{ + { + let total_chunks = self.expected_chunk_count() as f64; + let callback = &mut self.callback; + callback(self.decoded_chunks as f64 / total_chunks); + } + + self.decoded_chunks += 1; + item + }) + .or_else(||{ + debug_assert_eq!( + self.decoded_chunks, self.expected_chunk_count(), + "chunks reader finished but not all chunks are decompressed" + ); + + let callback = &mut self.callback; + callback(1.0); + None + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.chunks_reader.size_hint() + } +} + +impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { + fn meta_data(&self) -> &MetaData { &self.meta_data } + fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } +} + +impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} +impl<R: Read + Seek> Iterator for AllChunksReader<R> { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + // read as many chunks as the file should contain (inferred from meta data) + let next_chunk = self.remaining_chunks.next() + .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); + + // if no chunks are left, but some bytes remain, return error + if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { + return Some(Err(Error::invalid("end of file expected"))); + } + + next_chunk + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) + } +} + +impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { + fn meta_data(&self) -> &MetaData { &self.meta_data } + fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } +} + +impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} +impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + // read as many chunks as we have desired chunk offsets + self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ + self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts + usize::try_from(next_chunk_location) + .expect("too large chunk position for this machine") + )?; + + let meta_data = &self.meta_data; + Chunk::read(&mut self.remaining_bytes, meta_data) + }) + + // TODO remember last chunk index and then seek to index+size and check whether bytes are left? + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) + } +} + +/// Read all chunks from the file, decompressing each chunk immediately. +/// Implements iterator. +#[derive(Debug)] +pub struct SequentialBlockDecompressor<R: ChunksReader> { + remaining_chunks_reader: R, + pedantic: bool, +} + +impl<R: ChunksReader> SequentialBlockDecompressor<R> { + + /// The extracted meta data from the image file. + pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } + + /// Read and then decompress a single block of pixels from the byte source. + pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { + self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ + UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) + }) + } +} + +/// Decompress the chunks in a file in parallel. +/// The first call to `next` will fill the thread pool with jobs, +/// starting to decompress the next few blocks. +/// These jobs will finish, even if you stop reading more blocks. +/// Implements iterator. +#[derive(Debug)] +pub struct ParallelBlockDecompressor<R: ChunksReader> { + remaining_chunks: R, + sender: flume::Sender<Result<UncompressedBlock>>, + receiver: flume::Receiver<Result<UncompressedBlock>>, + currently_decompressing_count: usize, + max_threads: usize, + + shared_meta_data_ref: Arc<MetaData>, + pedantic: bool, + + pool: ThreadPool, +} + +impl<R: ChunksReader> ParallelBlockDecompressor<R> { + + /// Create a new decompressor. Does not immediately spawn any tasks. + /// Decompression starts after the first call to `next`. + /// Returns the chunks if parallel decompression should not be used. + /// Use `new_with_thread_pool` to customize the threadpool. + pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { + Self::new_with_thread_pool(chunks, pedantic, ||{ + rayon_core::ThreadPoolBuilder::new() + .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) + .build() + }) + } + + /// Create a new decompressor. Does not immediately spawn any tasks. + /// Decompression starts after the first call to `next`. + /// Returns the chunks if parallel decompression should not be used. + pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) + -> std::result::Result<Self, R> + where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> + { + // if no compression is used in the file, don't use a threadpool + if chunks.meta_data().headers.iter() + .all(|head|head.compression == Compression::Uncompressed) + { + return Err(chunks); + } + + // in case thread pool creation fails (for example on WASM currently), + // we revert to sequential decompression + let pool = match try_create_thread_pool() { + Ok(pool) => pool, + + // TODO print warning? + Err(_) => return Err(chunks), + }; + + let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times + + let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + + Ok(Self { + shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), + currently_decompressing_count: 0, + remaining_chunks: chunks, + sender: send, + receiver: recv, + pedantic, + max_threads, + + pool, + }) + } + + /// Fill the pool with decompression jobs. Returns the first job that finishes. + pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { + + while self.currently_decompressing_count < self.max_threads { + let block = self.remaining_chunks.next(); + if let Some(block) = block { + let block = match block { + Ok(block) => block, + Err(error) => return Some(Err(error)) + }; + + let sender = self.sender.clone(); + let meta = self.shared_meta_data_ref.clone(); + let pedantic = self.pedantic; + + self.currently_decompressing_count += 1; + + self.pool.spawn(move || { + let decompressed_or_err = UncompressedBlock::decompress_chunk( + block, &meta, pedantic + ); + + // by now, decompressing could have failed in another thread. + // the error is then already handled, so we simply + // don't send the decompressed block and do nothing + let _ = sender.send(decompressed_or_err); + }); + } + else { + // there are no chunks left to decompress + break; + } + } + + if self.currently_decompressing_count > 0 { + let next = self.receiver.recv() + .expect("all decompressing senders hung up but more messages were expected"); + + self.currently_decompressing_count -= 1; + Some(next) + } + else { + debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable + debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); + None + } + } + + /// The extracted meta data of the image file. + pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } +} + +impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { + type Item = Result<UncompressedBlock>; + fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } + fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } +} + +impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { + type Item = Result<UncompressedBlock>; + fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } + fn size_hint(&self) -> (usize, Option<usize>) { + let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; + (remaining, Some(remaining)) + } +} + + + + + |