//! Composable structures to handle reading an image. use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Read, Seek}; use rayon_core::{ThreadPool, ThreadPoolBuildError}; use smallvec::alloc::sync::Arc; use crate::block::{BlockIndex, UncompressedBlock}; use crate::block::chunk::{Chunk, TileCoordinates}; use crate::compression::Compression; use crate::error::{Error, Result, u64_to_usize, UnitResult}; use crate::io::{PeekRead, Tracking}; use crate::meta::{MetaData, OffsetTables}; use crate::meta::header::Header; /// Decode the meta data from a byte source, keeping the source ready for further reading. /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. #[derive(Debug)] pub struct Reader { meta_data: MetaData, remaining_reader: PeekRead>, // TODO does R need to be Seek or is Tracking enough? } impl Reader { /// Start the reading process. /// Immediately decodes the meta data into an internal field. /// Access it via`meta_data()`. pub fn read_from_buffered(read: R, pedantic: bool) -> Result { let mut remaining_reader = PeekRead::new(Tracking::new(read)); let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; Ok(Self { meta_data, remaining_reader }) } // must not be mutable, as reading the file later on relies on the meta data /// The decoded exr meta data from the file. pub fn meta_data(&self) -> &MetaData { &self.meta_data } /// The decoded exr meta data from the file. pub fn headers(&self) -> &[Header] { &self.meta_data.headers } /// Obtain the meta data ownership. pub fn into_meta_data(self) -> MetaData { self.meta_data } /// Prepare to read all the chunks from the file. /// Does not decode the chunks now, but returns a decoder. /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. pub fn all_chunks(mut self, pedantic: bool) -> Result> { let total_chunk_count = { if pedantic { let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; offset_tables.iter().map(|table| table.len()).sum() } else { usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) .expect("too large chunk count for this machine") } }; Ok(AllChunksReader { meta_data: self.meta_data, remaining_chunks: 0 .. total_chunk_count, remaining_bytes: self.remaining_reader, pedantic }) } /// Prepare to read some the chunks from the file. /// Does not decode the chunks now, but returns a decoder. /// Reading only some chunks may seeking the file, potentially skipping many bytes. // TODO tile indices add no new information to block index?? pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result> { let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? if pedantic { validate_offset_tables( self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position() )?; } let mut filtered_offsets = Vec::with_capacity( (self.meta_data.headers.len() * 32).min(2*2048) ); // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; let block = BlockIndex { layer: header_index, level: tile.location.level_index, pixel_position: data_indices.position.to_usize("data indices start")?, pixel_size: data_indices.size, }; if filter(&self.meta_data, tile.location, block) { filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` } }; } filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) if pedantic { // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { return Err(Error::invalid("chunk offset table")) } } Ok(FilteredChunksReader { meta_data: self.meta_data, expected_filtered_chunk_count: filtered_offsets.len(), remaining_filtered_chunk_indices: filtered_offsets.into_iter(), remaining_bytes: self.remaining_reader }) } } fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max .map(|header| header.max_pixel_file_bytes()) .sum(); // check that each offset is within the bounds let end_byte = chunks_start_byte + max_pixel_bytes; let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); if is_invalid { Err(Error::invalid("offset table")) } else { Ok(()) } } /// Decode the desired chunks and skip the unimportant chunks in the file. /// The decoded chunks can be decompressed by calling /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. /// Call `on_progress` to have a callback with each block. /// Also contains the image meta data. #[derive(Debug)] pub struct FilteredChunksReader { meta_data: MetaData, expected_filtered_chunk_count: usize, remaining_filtered_chunk_indices: std::vec::IntoIter, remaining_bytes: PeekRead>, } /// Decode all chunks in the file without seeking. /// The decoded chunks can be decompressed by calling /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. /// Call `on_progress` to have a callback with each block. /// Also contains the image meta data. #[derive(Debug)] pub struct AllChunksReader { meta_data: MetaData, remaining_chunks: std::ops::Range, remaining_bytes: PeekRead>, pedantic: bool, } /// Decode chunks in the file without seeking. /// Calls the supplied closure for each chunk. /// The decoded chunks can be decompressed by calling /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. /// Also contains the image meta data. #[derive(Debug)] pub struct OnProgressChunksReader { chunks_reader: R, decoded_chunks: usize, callback: F, } /// Decode chunks in the file. /// The decoded chunks can be decompressed by calling /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. /// Call `on_progress` to have a callback with each block. /// Also contains the image meta data. pub trait ChunksReader: Sized + Iterator> + ExactSizeIterator { /// The decoded exr meta data from the file. fn meta_data(&self) -> &MetaData; /// The decoded exr headers from the file. fn headers(&self) -> &[Header] { &self.meta_data().headers } /// The number of chunks that this reader will return in total. /// Can be less than the total number of chunks in the file, if some chunks are skipped. fn expected_chunk_count(&self) -> usize; /// Read the next compressed chunk from the file. /// Equivalent to `.next()`, as this also is an iterator. /// Returns `None` if all chunks have been read. fn read_next_chunk(&mut self) -> Option> { self.next() } /// Create a new reader that calls the provided progress /// callback for each chunk that is read from the file. /// If the file can be successfully decoded, /// the progress will always at least once include 0.0 at the start and 1.0 at the end. fn on_progress(self, on_progress: F) -> OnProgressChunksReader where F: FnMut(f64) { OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } } /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. /// The order of the blocks is not deterministic. /// You can also use `parallel_decompressor` to obtain an iterator instead. /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream) fn decompress_parallel( self, pedantic: bool, mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult ) -> UnitResult { let mut decompressor = match self.parallel_decompressor(pedantic) { Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), Ok(decompressor) => decompressor, }; while let Some(block) = decompressor.next() { insert_block(decompressor.meta_data(), block?)?; } debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); Ok(()) } /// Return an iterator that decompresses the chunks with multiple threads. /// The order of the blocks is not deterministic. /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. /// By default, this uses as many threads as there are CPUs. /// Returns the `self` if there is no need for parallel decompression. fn parallel_decompressor(self, pedantic: bool) -> std::result::Result, Self> { ParallelBlockDecompressor::new(self, pedantic) } /// Return an iterator that decompresses the chunks in this thread. /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. fn decompress_sequential( self, pedantic: bool, mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult ) -> UnitResult { let mut decompressor = self.sequential_decompressor(pedantic); while let Some(block) = decompressor.next() { insert_block(decompressor.meta_data(), block?)?; } debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); Ok(()) } /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor { SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } } } impl ChunksReader for OnProgressChunksReader where R: ChunksReader, F: FnMut(f64) { fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } } impl ExactSizeIterator for OnProgressChunksReader where R: ChunksReader, F: FnMut(f64) {} impl Iterator for OnProgressChunksReader where R: ChunksReader, F: FnMut(f64) { type Item = Result; fn next(&mut self) -> Option { self.chunks_reader.next().map(|item|{ { let total_chunks = self.expected_chunk_count() as f64; let callback = &mut self.callback; callback(self.decoded_chunks as f64 / total_chunks); } self.decoded_chunks += 1; item }) .or_else(||{ debug_assert_eq!( self.decoded_chunks, self.expected_chunk_count(), "chunks reader finished but not all chunks are decompressed" ); let callback = &mut self.callback; callback(1.0); None }) } fn size_hint(&self) -> (usize, Option) { self.chunks_reader.size_hint() } } impl ChunksReader for AllChunksReader { fn meta_data(&self) -> &MetaData { &self.meta_data } fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } } impl ExactSizeIterator for AllChunksReader {} impl Iterator for AllChunksReader { type Item = Result; fn next(&mut self) -> Option { // read as many chunks as the file should contain (inferred from meta data) let next_chunk = self.remaining_chunks.next() .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); // if no chunks are left, but some bytes remain, return error if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { return Some(Err(Error::invalid("end of file expected"))); } next_chunk } fn size_hint(&self) -> (usize, Option) { (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) } } impl ChunksReader for FilteredChunksReader { fn meta_data(&self) -> &MetaData { &self.meta_data } fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } } impl ExactSizeIterator for FilteredChunksReader {} impl Iterator for FilteredChunksReader { type Item = Result; fn next(&mut self) -> Option { // read as many chunks as we have desired chunk offsets self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts usize::try_from(next_chunk_location) .expect("too large chunk position for this machine") )?; let meta_data = &self.meta_data; Chunk::read(&mut self.remaining_bytes, meta_data) }) // TODO remember last chunk index and then seek to index+size and check whether bytes are left? } fn size_hint(&self) -> (usize, Option) { (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) } } /// Read all chunks from the file, decompressing each chunk immediately. /// Implements iterator. #[derive(Debug)] pub struct SequentialBlockDecompressor { remaining_chunks_reader: R, pedantic: bool, } impl SequentialBlockDecompressor { /// The extracted meta data from the image file. pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } /// Read and then decompress a single block of pixels from the byte source. pub fn decompress_next_block(&mut self) -> Option> { self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) }) } } /// Decompress the chunks in a file in parallel. /// The first call to `next` will fill the thread pool with jobs, /// starting to decompress the next few blocks. /// These jobs will finish, even if you stop reading more blocks. /// Implements iterator. #[derive(Debug)] pub struct ParallelBlockDecompressor { remaining_chunks: R, sender: flume::Sender>, receiver: flume::Receiver>, currently_decompressing_count: usize, max_threads: usize, shared_meta_data_ref: Arc, pedantic: bool, pool: ThreadPool, } impl ParallelBlockDecompressor { /// Create a new decompressor. Does not immediately spawn any tasks. /// Decompression starts after the first call to `next`. /// Returns the chunks if parallel decompression should not be used. /// Use `new_with_thread_pool` to customize the threadpool. pub fn new(chunks: R, pedantic: bool) -> std::result::Result { Self::new_with_thread_pool(chunks, pedantic, ||{ rayon_core::ThreadPoolBuilder::new() .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) .build() }) } /// Create a new decompressor. Does not immediately spawn any tasks. /// Decompression starts after the first call to `next`. /// Returns the chunks if parallel decompression should not be used. pub fn new_with_thread_pool(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) -> std::result::Result where CreatePool: FnOnce() -> std::result::Result { // if no compression is used in the file, don't use a threadpool if chunks.meta_data().headers.iter() .all(|head|head.compression == Compression::Uncompressed) { return Err(chunks); } // in case thread pool creation fails (for example on WASM currently), // we revert to sequential decompression let pool = match try_create_thread_pool() { Ok(pool) => pool, // TODO print warning? Err(_) => return Err(chunks), }; let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? Ok(Self { shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), currently_decompressing_count: 0, remaining_chunks: chunks, sender: send, receiver: recv, pedantic, max_threads, pool, }) } /// Fill the pool with decompression jobs. Returns the first job that finishes. pub fn decompress_next_block(&mut self) -> Option> { while self.currently_decompressing_count < self.max_threads { let block = self.remaining_chunks.next(); if let Some(block) = block { let block = match block { Ok(block) => block, Err(error) => return Some(Err(error)) }; let sender = self.sender.clone(); let meta = self.shared_meta_data_ref.clone(); let pedantic = self.pedantic; self.currently_decompressing_count += 1; self.pool.spawn(move || { let decompressed_or_err = UncompressedBlock::decompress_chunk( block, &meta, pedantic ); // by now, decompressing could have failed in another thread. // the error is then already handled, so we simply // don't send the decompressed block and do nothing let _ = sender.send(decompressed_or_err); }); } else { // there are no chunks left to decompress break; } } if self.currently_decompressing_count > 0 { let next = self.receiver.recv() .expect("all decompressing senders hung up but more messages were expected"); self.currently_decompressing_count -= 1; Some(next) } else { debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); None } } /// The extracted meta data of the image file. pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } } impl ExactSizeIterator for SequentialBlockDecompressor {} impl Iterator for SequentialBlockDecompressor { type Item = Result; fn next(&mut self) -> Option { self.decompress_next_block() } fn size_hint(&self) -> (usize, Option) { self.remaining_chunks_reader.size_hint() } } impl ExactSizeIterator for ParallelBlockDecompressor {} impl Iterator for ParallelBlockDecompressor { type Item = Result; fn next(&mut self) -> Option { self.decompress_next_block() } fn size_hint(&self) -> (usize, Option) { let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; (remaining, Some(remaining)) } }