diff options
Diffstat (limited to 'vendor/exr/src/block/reader.rs')
-rw-r--r-- | vendor/exr/src/block/reader.rs | 527 |
1 files changed, 0 insertions, 527 deletions
diff --git a/vendor/exr/src/block/reader.rs b/vendor/exr/src/block/reader.rs deleted file mode 100644 index bb9888e..0000000 --- a/vendor/exr/src/block/reader.rs +++ /dev/null @@ -1,527 +0,0 @@ -//! Composable structures to handle reading an image. - - -use std::convert::TryFrom; -use std::fmt::Debug; -use std::io::{Read, Seek}; -use rayon_core::{ThreadPool, ThreadPoolBuildError}; - -use smallvec::alloc::sync::Arc; - -use crate::block::{BlockIndex, UncompressedBlock}; -use crate::block::chunk::{Chunk, TileCoordinates}; -use crate::compression::Compression; -use crate::error::{Error, Result, u64_to_usize, UnitResult}; -use crate::io::{PeekRead, Tracking}; -use crate::meta::{MetaData, OffsetTables}; -use crate::meta::header::Header; - -/// Decode the meta data from a byte source, keeping the source ready for further reading. -/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. -#[derive(Debug)] -pub struct Reader<R> { - meta_data: MetaData, - remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? -} - -impl<R: Read + Seek> Reader<R> { - - /// Start the reading process. - /// Immediately decodes the meta data into an internal field. - /// Access it via`meta_data()`. - pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { - let mut remaining_reader = PeekRead::new(Tracking::new(read)); - let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; - Ok(Self { meta_data, remaining_reader }) - } - - // must not be mutable, as reading the file later on relies on the meta data - /// The decoded exr meta data from the file. - pub fn meta_data(&self) -> &MetaData { &self.meta_data } - - /// The decoded exr meta data from the file. - pub fn headers(&self) -> &[Header] { &self.meta_data.headers } - - /// Obtain the meta data ownership. - pub fn into_meta_data(self) -> MetaData { self.meta_data } - - /// Prepare to read all the chunks from the file. - /// Does not decode the chunks now, but returns a decoder. - /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. - pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { - let total_chunk_count = { - if pedantic { - let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; - validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; - offset_tables.iter().map(|table| table.len()).sum() - } - else { - usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) - .expect("too large chunk count for this machine") - } - }; - - Ok(AllChunksReader { - meta_data: self.meta_data, - remaining_chunks: 0 .. total_chunk_count, - remaining_bytes: self.remaining_reader, - pedantic - }) - } - - /// Prepare to read some the chunks from the file. - /// Does not decode the chunks now, but returns a decoder. - /// Reading only some chunks may seeking the file, potentially skipping many bytes. - // TODO tile indices add no new information to block index?? - pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { - let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; - - // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? - if pedantic { - validate_offset_tables( - self.meta_data.headers.as_slice(), &offset_tables, - self.remaining_reader.byte_position() - )?; - } - - let mut filtered_offsets = Vec::with_capacity( - (self.meta_data.headers.len() * 32).min(2*2048) - ); - - // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied - - for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers - for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order - let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; - - let block = BlockIndex { - layer: header_index, - level: tile.location.level_index, - pixel_position: data_indices.position.to_usize("data indices start")?, - pixel_size: data_indices.size, - }; - - if filter(&self.meta_data, tile.location, block) { - filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` - } - }; - } - - filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) - - if pedantic { - // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. - if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { - return Err(Error::invalid("chunk offset table")) - } - } - - Ok(FilteredChunksReader { - meta_data: self.meta_data, - expected_filtered_chunk_count: filtered_offsets.len(), - remaining_filtered_chunk_indices: filtered_offsets.into_iter(), - remaining_bytes: self.remaining_reader - }) - } -} - - -fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { - let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max - .map(|header| header.max_pixel_file_bytes()) - .sum(); - - // check that each offset is within the bounds - let end_byte = chunks_start_byte + max_pixel_bytes; - let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) - .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); - - if is_invalid { Err(Error::invalid("offset table")) } - else { Ok(()) } -} - - - - -/// Decode the desired chunks and skip the unimportant chunks in the file. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct FilteredChunksReader<R> { - meta_data: MetaData, - expected_filtered_chunk_count: usize, - remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, - remaining_bytes: PeekRead<Tracking<R>>, -} - -/// Decode all chunks in the file without seeking. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct AllChunksReader<R> { - meta_data: MetaData, - remaining_chunks: std::ops::Range<usize>, - remaining_bytes: PeekRead<Tracking<R>>, - pedantic: bool, -} - -/// Decode chunks in the file without seeking. -/// Calls the supplied closure for each chunk. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct OnProgressChunksReader<R, F> { - chunks_reader: R, - decoded_chunks: usize, - callback: F, -} - -/// Decode chunks in the file. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { - - /// The decoded exr meta data from the file. - fn meta_data(&self) -> &MetaData; - - /// The decoded exr headers from the file. - fn headers(&self) -> &[Header] { &self.meta_data().headers } - - /// The number of chunks that this reader will return in total. - /// Can be less than the total number of chunks in the file, if some chunks are skipped. - fn expected_chunk_count(&self) -> usize; - - /// Read the next compressed chunk from the file. - /// Equivalent to `.next()`, as this also is an iterator. - /// Returns `None` if all chunks have been read. - fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } - - /// Create a new reader that calls the provided progress - /// callback for each chunk that is read from the file. - /// If the file can be successfully decoded, - /// the progress will always at least once include 0.0 at the start and 1.0 at the end. - fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { - OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } - } - - /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. - /// The order of the blocks is not deterministic. - /// You can also use `parallel_decompressor` to obtain an iterator instead. - /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. - // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) - fn decompress_parallel( - self, pedantic: bool, - mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult - ) -> UnitResult - { - let mut decompressor = match self.parallel_decompressor(pedantic) { - Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), - Ok(decompressor) => decompressor, - }; - - while let Some(block) = decompressor.next() { - insert_block(decompressor.meta_data(), block?)?; - } - - debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); - Ok(()) - } - - /// Return an iterator that decompresses the chunks with multiple threads. - /// The order of the blocks is not deterministic. - /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. - /// By default, this uses as many threads as there are CPUs. - /// Returns the `self` if there is no need for parallel decompression. - fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { - ParallelBlockDecompressor::new(self, pedantic) - } - - /// Return an iterator that decompresses the chunks in this thread. - /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. - fn decompress_sequential( - self, pedantic: bool, - mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult - ) -> UnitResult - { - let mut decompressor = self.sequential_decompressor(pedantic); - while let Some(block) = decompressor.next() { - insert_block(decompressor.meta_data(), block?)?; - } - - debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); - Ok(()) - } - - /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. - fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { - SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } - } -} - -impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { - fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } - fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } -} - -impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} -impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - self.chunks_reader.next().map(|item|{ - { - let total_chunks = self.expected_chunk_count() as f64; - let callback = &mut self.callback; - callback(self.decoded_chunks as f64 / total_chunks); - } - - self.decoded_chunks += 1; - item - }) - .or_else(||{ - debug_assert_eq!( - self.decoded_chunks, self.expected_chunk_count(), - "chunks reader finished but not all chunks are decompressed" - ); - - let callback = &mut self.callback; - callback(1.0); - None - }) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.chunks_reader.size_hint() - } -} - -impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { - fn meta_data(&self) -> &MetaData { &self.meta_data } - fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } -} - -impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} -impl<R: Read + Seek> Iterator for AllChunksReader<R> { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - // read as many chunks as the file should contain (inferred from meta data) - let next_chunk = self.remaining_chunks.next() - .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); - - // if no chunks are left, but some bytes remain, return error - if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { - return Some(Err(Error::invalid("end of file expected"))); - } - - next_chunk - } - - fn size_hint(&self) -> (usize, Option<usize>) { - (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) - } -} - -impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { - fn meta_data(&self) -> &MetaData { &self.meta_data } - fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } -} - -impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} -impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - // read as many chunks as we have desired chunk offsets - self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ - self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts - usize::try_from(next_chunk_location) - .expect("too large chunk position for this machine") - )?; - - let meta_data = &self.meta_data; - Chunk::read(&mut self.remaining_bytes, meta_data) - }) - - // TODO remember last chunk index and then seek to index+size and check whether bytes are left? - } - - fn size_hint(&self) -> (usize, Option<usize>) { - (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) - } -} - -/// Read all chunks from the file, decompressing each chunk immediately. -/// Implements iterator. -#[derive(Debug)] -pub struct SequentialBlockDecompressor<R: ChunksReader> { - remaining_chunks_reader: R, - pedantic: bool, -} - -impl<R: ChunksReader> SequentialBlockDecompressor<R> { - - /// The extracted meta data from the image file. - pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } - - /// Read and then decompress a single block of pixels from the byte source. - pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { - self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ - UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) - }) - } -} - -/// Decompress the chunks in a file in parallel. -/// The first call to `next` will fill the thread pool with jobs, -/// starting to decompress the next few blocks. -/// These jobs will finish, even if you stop reading more blocks. -/// Implements iterator. -#[derive(Debug)] -pub struct ParallelBlockDecompressor<R: ChunksReader> { - remaining_chunks: R, - sender: flume::Sender<Result<UncompressedBlock>>, - receiver: flume::Receiver<Result<UncompressedBlock>>, - currently_decompressing_count: usize, - max_threads: usize, - - shared_meta_data_ref: Arc<MetaData>, - pedantic: bool, - - pool: ThreadPool, -} - -impl<R: ChunksReader> ParallelBlockDecompressor<R> { - - /// Create a new decompressor. Does not immediately spawn any tasks. - /// Decompression starts after the first call to `next`. - /// Returns the chunks if parallel decompression should not be used. - /// Use `new_with_thread_pool` to customize the threadpool. - pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { - Self::new_with_thread_pool(chunks, pedantic, ||{ - rayon_core::ThreadPoolBuilder::new() - .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) - .build() - }) - } - - /// Create a new decompressor. Does not immediately spawn any tasks. - /// Decompression starts after the first call to `next`. - /// Returns the chunks if parallel decompression should not be used. - pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) - -> std::result::Result<Self, R> - where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> - { - // if no compression is used in the file, don't use a threadpool - if chunks.meta_data().headers.iter() - .all(|head|head.compression == Compression::Uncompressed) - { - return Err(chunks); - } - - // in case thread pool creation fails (for example on WASM currently), - // we revert to sequential decompression - let pool = match try_create_thread_pool() { - Ok(pool) => pool, - - // TODO print warning? - Err(_) => return Err(chunks), - }; - - let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times - - let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? - - Ok(Self { - shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), - currently_decompressing_count: 0, - remaining_chunks: chunks, - sender: send, - receiver: recv, - pedantic, - max_threads, - - pool, - }) - } - - /// Fill the pool with decompression jobs. Returns the first job that finishes. - pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { - - while self.currently_decompressing_count < self.max_threads { - let block = self.remaining_chunks.next(); - if let Some(block) = block { - let block = match block { - Ok(block) => block, - Err(error) => return Some(Err(error)) - }; - - let sender = self.sender.clone(); - let meta = self.shared_meta_data_ref.clone(); - let pedantic = self.pedantic; - - self.currently_decompressing_count += 1; - - self.pool.spawn(move || { - let decompressed_or_err = UncompressedBlock::decompress_chunk( - block, &meta, pedantic - ); - - // by now, decompressing could have failed in another thread. - // the error is then already handled, so we simply - // don't send the decompressed block and do nothing - let _ = sender.send(decompressed_or_err); - }); - } - else { - // there are no chunks left to decompress - break; - } - } - - if self.currently_decompressing_count > 0 { - let next = self.receiver.recv() - .expect("all decompressing senders hung up but more messages were expected"); - - self.currently_decompressing_count -= 1; - Some(next) - } - else { - debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable - debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); - None - } - } - - /// The extracted meta data of the image file. - pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } -} - -impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} -impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { - type Item = Result<UncompressedBlock>; - fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } - fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } -} - -impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} -impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { - type Item = Result<UncompressedBlock>; - fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } - fn size_hint(&self) -> (usize, Option<usize>) { - let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; - (remaining, Some(remaining)) - } -} - - - - - |