diff options
author | Valentin Popov <valentin@popov.link> | 2024-07-19 15:37:58 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-07-19 15:37:58 +0300 |
commit | a990de90fe41456a23e58bd087d2f107d321f3a1 (patch) | |
tree | 15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/exr/src/block | |
parent | 3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff) | |
download | fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip |
Deleted vendor folder
Diffstat (limited to 'vendor/exr/src/block')
-rw-r--r-- | vendor/exr/src/block/chunk.rs | 379 | ||||
-rw-r--r-- | vendor/exr/src/block/lines.rs | 197 | ||||
-rw-r--r-- | vendor/exr/src/block/mod.rs | 257 | ||||
-rw-r--r-- | vendor/exr/src/block/reader.rs | 527 | ||||
-rw-r--r-- | vendor/exr/src/block/samples.rs | 248 | ||||
-rw-r--r-- | vendor/exr/src/block/writer.rs | 468 |
6 files changed, 0 insertions, 2076 deletions
diff --git a/vendor/exr/src/block/chunk.rs b/vendor/exr/src/block/chunk.rs deleted file mode 100644 index ff138f8..0000000 --- a/vendor/exr/src/block/chunk.rs +++ /dev/null @@ -1,379 +0,0 @@ - -//! Read and write already compressed pixel data blocks. -//! Does not include the process of compression and decompression. - -use crate::meta::attribute::{IntegerBounds}; - -/// A generic block of pixel information. -/// Contains pixel data and an index to the corresponding header. -/// All pixel data in a file is split into a list of chunks. -/// Also contains positioning information that locates this -/// data block in the referenced layer. -#[derive(Debug, Clone)] -pub struct Chunk { - - /// The index of the layer that the block belongs to. - /// This is required as the pixel data can appear in any order in a file. - // PDF says u64, but source code seems to be i32 - pub layer_index: usize, - - /// The compressed pixel contents. - pub compressed_block: CompressedBlock, -} - -/// The raw, possibly compressed pixel data of a file. -/// Each layer in a file can have a different type. -/// Also contains positioning information that locates this -/// data block in the corresponding layer. -/// Exists inside a `Chunk`. -#[derive(Debug, Clone)] -pub enum CompressedBlock { - - /// Scan line blocks of flat data. - ScanLine(CompressedScanLineBlock), - - /// Tiles of flat data. - Tile(CompressedTileBlock), - - /// Scan line blocks of deep data. - DeepScanLine(CompressedDeepScanLineBlock), - - /// Tiles of deep data. - DeepTile(CompressedDeepTileBlock), -} - -/// A `Block` of possibly compressed flat scan lines. -/// Corresponds to type attribute `scanlineimage`. -#[derive(Debug, Clone)] -pub struct CompressedScanLineBlock { - - /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. - /// The top scan line block in the image is aligned with the top edge of the data window. - pub y_coordinate: i32, - - /// One or more scan lines may be stored together as a scan line block. - /// The number of scan lines per block depends on how the pixel data are compressed. - /// For each line in the tile, for each channel, the row values are contiguous. - pub compressed_pixels: Vec<u8>, -} - -/// This `Block` is a tile of flat (non-deep) data. -/// Corresponds to type attribute `tiledimage`. -#[derive(Debug, Clone)] -pub struct CompressedTileBlock { - - /// The tile location. - pub coordinates: TileCoordinates, - - /// One or more scan lines may be stored together as a scan line block. - /// The number of scan lines per block depends on how the pixel data are compressed. - /// For each line in the tile, for each channel, the row values are contiguous. - pub compressed_pixels: Vec<u8>, -} - -/// Indicates the position and resolution level of a `TileBlock` or `DeepTileBlock`. -#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] -pub struct TileCoordinates { - - /// Index of the tile, not pixel position. - pub tile_index: Vec2<usize>, - - /// Index of the Mip/Rip level. - pub level_index: Vec2<usize>, -} - -/// This `Block` consists of one or more deep scan lines. -/// Corresponds to type attribute `deepscanline`. -#[derive(Debug, Clone)] -pub struct CompressedDeepScanLineBlock { - - /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. - /// The top scan line block in the image is aligned with the top edge of the data window. - pub y_coordinate: i32, - - /// Count of samples. - pub decompressed_sample_data_size: usize, - - /// The pixel offset table is a list of integers, one for each pixel column within the data window. - /// Each entry in the table indicates the total number of samples required - /// to store the pixel in it as well as all pixels to the left of it. - pub compressed_pixel_offset_table: Vec<i8>, - - /// One or more scan lines may be stored together as a scan line block. - /// The number of scan lines per block depends on how the pixel data are compressed. - /// For each line in the tile, for each channel, the row values are contiguous. - pub compressed_sample_data: Vec<u8>, -} - -/// This `Block` is a tile of deep data. -/// Corresponds to type attribute `deeptile`. -#[derive(Debug, Clone)] -pub struct CompressedDeepTileBlock { - - /// The tile location. - pub coordinates: TileCoordinates, - - /// Count of samples. - pub decompressed_sample_data_size: usize, - - /// The pixel offset table is a list of integers, one for each pixel column within the data window. - /// Each entry in the table indicates the total number of samples required - /// to store the pixel in it as well as all pixels to the left of it. - pub compressed_pixel_offset_table: Vec<i8>, - - /// One or more scan lines may be stored together as a scan line block. - /// The number of scan lines per block depends on how the pixel data are compressed. - /// For each line in the tile, for each channel, the row values are contiguous. - pub compressed_sample_data: Vec<u8>, -} - - -use crate::io::*; - -impl TileCoordinates { - - /// Without validation, write this instance to the byte stream. - pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { - i32::write(usize_to_i32(self.tile_index.x()), write)?; - i32::write(usize_to_i32(self.tile_index.y()), write)?; - i32::write(usize_to_i32(self.level_index.x()), write)?; - i32::write(usize_to_i32(self.level_index.y()), write)?; - Ok(()) - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read) -> Result<Self> { - let tile_x = i32::read(read)?; - let tile_y = i32::read(read)?; - - let level_x = i32::read(read)?; - let level_y = i32::read(read)?; - - if level_x > 31 || level_y > 31 { - // there can be at most 31 levels, because the largest level would have a size of 2^31, - // which exceeds the maximum 32-bit integer value. - return Err(Error::invalid("level index exceeding integer maximum")); - } - - Ok(TileCoordinates { - tile_index: Vec2(tile_x, tile_y).to_usize("tile coordinate index")?, - level_index: Vec2(level_x, level_y).to_usize("tile coordinate level")? - }) - } - - /// The indices which can be used to index into the arrays of a data window. - /// These coordinates are only valid inside the corresponding one header. - /// Will start at 0 and always be positive. - pub fn to_data_indices(&self, tile_size: Vec2<usize>, max: Vec2<usize>) -> Result<IntegerBounds> { - let x = self.tile_index.x() * tile_size.width(); - let y = self.tile_index.y() * tile_size.height(); - - if x >= max.x() || y >= max.y() { - Err(Error::invalid("tile index")) - } - else { - Ok(IntegerBounds { - position: Vec2(usize_to_i32(x), usize_to_i32(y)), - size: Vec2( - calculate_block_size(max.x(), tile_size.width(), x)?, - calculate_block_size(max.y(), tile_size.height(), y)?, - ), - }) - } - } - - /// Absolute coordinates inside the global 2D space of a file, may be negative. - pub fn to_absolute_indices(&self, tile_size: Vec2<usize>, data_window: IntegerBounds) -> Result<IntegerBounds> { - let data = self.to_data_indices(tile_size, data_window.size)?; - Ok(data.with_origin(data_window.position)) - } - - /// Returns if this is the original resolution or a smaller copy. - pub fn is_largest_resolution_level(&self) -> bool { - self.level_index == Vec2(0, 0) - } -} - - - -use crate::meta::{MetaData, BlockDescription, calculate_block_size}; - -impl CompressedScanLineBlock { - - /// Without validation, write this instance to the byte stream. - pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { - debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); - - i32::write(self.y_coordinate, write)?; - u8::write_i32_sized_slice(write, &self.compressed_pixels)?; - Ok(()) - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { - let y_coordinate = i32::read(read)?; - let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "scan line block sample count")?; - Ok(CompressedScanLineBlock { y_coordinate, compressed_pixels }) - } -} - -impl CompressedTileBlock { - - /// Without validation, write this instance to the byte stream. - pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { - debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); - - self.coordinates.write(write)?; - u8::write_i32_sized_slice(write, &self.compressed_pixels)?; - Ok(()) - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { - let coordinates = TileCoordinates::read(read)?; - let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "tile block sample count")?; - Ok(CompressedTileBlock { coordinates, compressed_pixels }) - } -} - -impl CompressedDeepScanLineBlock { - - /// Without validation, write this instance to the byte stream. - pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { - debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); - - i32::write(self.y_coordinate, write)?; - u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; - u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed - u64::write(self.decompressed_sample_data_size as u64, write)?; - i8::write_slice(write, &self.compressed_pixel_offset_table)?; - u8::write_slice(write, &self.compressed_sample_data)?; - Ok(()) - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { - let y_coordinate = i32::read(read)?; - let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); - let compressed_sample_data_size = u64_to_usize(u64::read(read)?); - let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); - - // doc said i32, try u8 - let compressed_pixel_offset_table = i8::read_vec( - read, compressed_pixel_offset_table_size, - 6 * u16::MAX as usize, Some(max_block_byte_size), - "deep scan line block table size" - )?; - - let compressed_sample_data = u8::read_vec( - read, compressed_sample_data_size, - 6 * u16::MAX as usize, Some(max_block_byte_size), - "deep scan line block sample count" - )?; - - Ok(CompressedDeepScanLineBlock { - y_coordinate, - decompressed_sample_data_size, - compressed_pixel_offset_table, - compressed_sample_data, - }) - } -} - - -impl CompressedDeepTileBlock { - - /// Without validation, write this instance to the byte stream. - pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { - debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); - - self.coordinates.write(write)?; - u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; - u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed - u64::write(self.decompressed_sample_data_size as u64, write)?; - i8::write_slice(write, &self.compressed_pixel_offset_table)?; - u8::write_slice(write, &self.compressed_sample_data)?; - Ok(()) - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read, hard_max_block_byte_size: usize) -> Result<Self> { - let coordinates = TileCoordinates::read(read)?; - let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); - let compressed_sample_data_size = u64_to_usize(u64::read(read)?); // TODO u64 just guessed - let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); - - let compressed_pixel_offset_table = i8::read_vec( - read, compressed_pixel_offset_table_size, - 6 * u16::MAX as usize, Some(hard_max_block_byte_size), - "deep tile block table size" - )?; - - let compressed_sample_data = u8::read_vec( - read, compressed_sample_data_size, - 6 * u16::MAX as usize, Some(hard_max_block_byte_size), - "deep tile block sample count" - )?; - - Ok(CompressedDeepTileBlock { - coordinates, - decompressed_sample_data_size, - compressed_pixel_offset_table, - compressed_sample_data, - }) - } -} - -use crate::error::{UnitResult, Result, Error, u64_to_usize, usize_to_i32, i32_to_usize}; -use crate::math::Vec2; - -/// Validation of chunks is done while reading and writing the actual data. (For example in exr::full_image) -impl Chunk { - - /// Without validation, write this instance to the byte stream. - pub fn write(&self, write: &mut impl Write, header_count: usize) -> UnitResult { - debug_assert!(self.layer_index < header_count, "layer index bug"); // validation is done in full_image or simple_image - - if header_count != 1 { usize_to_i32(self.layer_index).write(write)?; } - else { assert_eq!(self.layer_index, 0, "invalid header index for single layer file"); } - - match self.compressed_block { - CompressedBlock::ScanLine (ref value) => value.write(write), - CompressedBlock::Tile (ref value) => value.write(write), - CompressedBlock::DeepScanLine (ref value) => value.write(write), - CompressedBlock::DeepTile (ref value) => value.write(write), - } - } - - /// Read the value without validating. - pub fn read(read: &mut impl Read, meta_data: &MetaData) -> Result<Self> { - let layer_number = i32_to_usize( - if meta_data.requirements.is_multilayer() { i32::read(read)? } // documentation says u64, but is i32 - else { 0_i32 }, // reference the first header for single-layer images - "chunk data part number" - )?; - - if layer_number >= meta_data.headers.len() { - return Err(Error::invalid("chunk data part number")); - } - - let header = &meta_data.headers[layer_number]; - let max_block_byte_size = header.max_block_byte_size(); - - let chunk = Chunk { - layer_index: layer_number, - compressed_block: match header.blocks { - // flat data - BlockDescription::ScanLines if !header.deep => CompressedBlock::ScanLine(CompressedScanLineBlock::read(read, max_block_byte_size)?), - BlockDescription::Tiles(_) if !header.deep => CompressedBlock::Tile(CompressedTileBlock::read(read, max_block_byte_size)?), - - // deep data - BlockDescription::ScanLines => CompressedBlock::DeepScanLine(CompressedDeepScanLineBlock::read(read, max_block_byte_size)?), - BlockDescription::Tiles(_) => CompressedBlock::DeepTile(CompressedDeepTileBlock::read(read, max_block_byte_size)?), - }, - }; - - Ok(chunk) - } -} - diff --git a/vendor/exr/src/block/lines.rs b/vendor/exr/src/block/lines.rs deleted file mode 100644 index 1cdf8ee..0000000 --- a/vendor/exr/src/block/lines.rs +++ /dev/null @@ -1,197 +0,0 @@ -//! Extract lines from a block of pixel bytes. - -use crate::math::*; -use std::io::{Cursor}; -use crate::error::{Result, UnitResult}; -use smallvec::SmallVec; -use std::ops::Range; -use crate::block::{BlockIndex}; -use crate::meta::attribute::ChannelList; - - -/// A single line of pixels. -/// Use [LineRef] or [LineRefMut] for easier type names. -#[derive(Clone, Copy, Eq, PartialEq, Debug)] -pub struct LineSlice<T> { - - // TODO also store enum SampleType, as it would always be matched in every place it is used - - /// Where this line is located inside the image. - pub location: LineIndex, - - /// The raw bytes of the pixel line, either `&[u8]` or `&mut [u8]`. - /// Must be re-interpreted as slice of f16, f32, or u32, - /// according to the channel data type. - pub value: T, -} - - -/// An reference to a single line of pixels. -/// May go across the whole image or just a tile section of it. -/// -/// This line contains an immutable slice that all samples will be read from. -pub type LineRef<'s> = LineSlice<&'s [u8]>; - -/// A reference to a single mutable line of pixels. -/// May go across the whole image or just a tile section of it. -/// -/// This line contains a mutable slice that all samples will be written to. -pub type LineRefMut<'s> = LineSlice<&'s mut [u8]>; - - -/// Specifies where a row of pixels lies inside an image. -/// This is a globally unique identifier which includes -/// the layer, channel index, and pixel location. -#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)] -pub struct LineIndex { - - /// Index of the layer. - pub layer: usize, - - /// The channel index of the layer. - pub channel: usize, - - /// Index of the mip or rip level in the image. - pub level: Vec2<usize>, - - /// Position of the most left pixel of the row. - pub position: Vec2<usize>, - - /// The width of the line; the number of samples in this row, - /// that is, the number of f16, f32, or u32 values. - pub sample_count: usize, -} - - -impl LineIndex { - - /// Iterates the lines of this block index in interleaved fashion: - /// For each line in this block, this iterator steps once through each channel. - /// This is how lines are stored in a pixel data block. - /// - /// Does not check whether `self.layer_index`, `self.level`, `self.size` and `self.position` are valid indices.__ - // TODO be sure this cannot produce incorrect data, as this is not further checked but only handled with panics - #[inline] - #[must_use] - pub fn lines_in_block(block: BlockIndex, channels: &ChannelList) -> impl Iterator<Item=(Range<usize>, LineIndex)> { - struct LineIter { - layer: usize, level: Vec2<usize>, width: usize, - end_y: usize, x: usize, channel_sizes: SmallVec<[usize; 8]>, - byte: usize, channel: usize, y: usize, - } - - // FIXME what about sub sampling?? - - impl Iterator for LineIter { - type Item = (Range<usize>, LineIndex); - // TODO size hint? - - fn next(&mut self) -> Option<Self::Item> { - if self.y < self.end_y { - - // compute return value before incrementing - let byte_len = self.channel_sizes[self.channel]; - let return_value = ( - (self.byte .. self.byte + byte_len), - LineIndex { - channel: self.channel, - layer: self.layer, - level: self.level, - position: Vec2(self.x, self.y), - sample_count: self.width, - } - ); - - { // increment indices - self.byte += byte_len; - self.channel += 1; - - if self.channel == self.channel_sizes.len() { - self.channel = 0; - self.y += 1; - } - } - - Some(return_value) - } - - else { - None - } - } - } - - let channel_line_sizes: SmallVec<[usize; 8]> = channels.list.iter() - .map(move |channel| block.pixel_size.0 * channel.sample_type.bytes_per_sample()) // FIXME is it fewer samples per tile or just fewer tiles for sampled images??? - .collect(); - - LineIter { - layer: block.layer, - level: block.level, - width: block.pixel_size.0, - x: block.pixel_position.0, - end_y: block.pixel_position.y() + block.pixel_size.height(), - channel_sizes: channel_line_sizes, - - byte: 0, - channel: 0, - y: block.pixel_position.y() - } - } -} - - - -impl<'s> LineRefMut<'s> { - - /// Writes the samples (f16, f32, u32 values) into this line value reference. - /// Use `write_samples` if there is not slice available. - #[inline] - #[must_use] - pub fn write_samples_from_slice<T: crate::io::Data>(self, slice: &[T]) -> UnitResult { - debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); - debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); - - T::write_slice(&mut Cursor::new(self.value), slice) - } - - /// Iterate over all samples in this line, from left to right. - /// The supplied `get_line` function returns the sample value - /// for a given sample index within the line, - /// which starts at zero for each individual line. - /// Use `write_samples_from_slice` if you already have a slice of samples. - #[inline] - #[must_use] - pub fn write_samples<T: crate::io::Data>(self, mut get_sample: impl FnMut(usize) -> T) -> UnitResult { - debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); - - let mut write = Cursor::new(self.value); - - for index in 0..self.location.sample_count { - T::write(get_sample(index), &mut write)?; - } - - Ok(()) - } -} - -impl LineRef<'_> { - - /// Read the samples (f16, f32, u32 values) from this line value reference. - /// Use `read_samples` if there is not slice available. - pub fn read_samples_into_slice<T: crate::io::Data>(self, slice: &mut [T]) -> UnitResult { - debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); - debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); - - T::read_slice(&mut Cursor::new(self.value), slice) - } - - /// Iterate over all samples in this line, from left to right. - /// Use `read_sample_into_slice` if you already have a slice of samples. - pub fn read_samples<T: crate::io::Data>(&self) -> impl Iterator<Item = Result<T>> + '_ { - debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); - - let mut read = self.value.clone(); // FIXME deep data - (0..self.location.sample_count).map(move |_| T::read(&mut read)) - } -}
\ No newline at end of file diff --git a/vendor/exr/src/block/mod.rs b/vendor/exr/src/block/mod.rs deleted file mode 100644 index 1d20aa8..0000000 --- a/vendor/exr/src/block/mod.rs +++ /dev/null @@ -1,257 +0,0 @@ -//! This is the low-level interface for the raw blocks of an image. -//! See `exr::image` module for a high-level interface. -//! -//! Handle compressed and uncompressed pixel byte blocks. Includes compression and decompression, -//! and reading a complete image into blocks. -//! -//! Start with the `block::read(...)` -//! and `block::write(...)` functions. - - -pub mod writer; -pub mod reader; - -pub mod lines; -pub mod samples; -pub mod chunk; - - -use std::io::{Read, Seek, Write}; -use crate::error::{Result, UnitResult, Error, usize_to_i32}; -use crate::meta::{Headers, MetaData, BlockDescription}; -use crate::math::Vec2; -use crate::compression::ByteVec; -use crate::block::chunk::{CompressedBlock, CompressedTileBlock, CompressedScanLineBlock, Chunk, TileCoordinates}; -use crate::meta::header::Header; -use crate::block::lines::{LineIndex, LineRef, LineSlice, LineRefMut}; -use crate::meta::attribute::ChannelList; - - -/// Specifies where a block of pixel data should be placed in the actual image. -/// This is a globally unique identifier which -/// includes the layer, level index, and pixel location. -#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)] -pub struct BlockIndex { - - /// Index of the layer. - pub layer: usize, - - /// Index of the top left pixel from the block within the data window. - pub pixel_position: Vec2<usize>, - - /// Number of pixels in this block, extending to the right and downwards. - /// Stays the same across all resolution levels. - pub pixel_size: Vec2<usize>, - - /// Index of the mip or rip level in the image. - pub level: Vec2<usize>, -} - -/// Contains a block of pixel data and where that data should be placed in the actual image. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct UncompressedBlock { - - /// Location of the data inside the image. - pub index: BlockIndex, - - /// Uncompressed pixel values of the whole block. - /// One or more scan lines may be stored together as a scan line block. - /// This byte vector contains all pixel rows, one after another. - /// For each line in the tile, for each channel, the row values are contiguous. - /// Stores all samples of the first channel, then all samples of the second channel, and so on. - pub data: ByteVec, -} - -/// Immediately reads the meta data from the file. -/// Then, returns a reader that can be used to read all pixel blocks. -/// From the reader, you can pull each compressed chunk from the file. -/// Alternatively, you can create a decompressor, and pull the uncompressed data from it. -/// The reader is assumed to be buffered. -pub fn read<R: Read + Seek>(buffered_read: R, pedantic: bool) -> Result<self::reader::Reader<R>> { - self::reader::Reader::read_from_buffered(buffered_read, pedantic) -} - -/// Immediately writes the meta data to the file. -/// Then, calls a closure with a writer that can be used to write all pixel blocks. -/// In the closure, you can push compressed chunks directly into the writer. -/// Alternatively, you can create a compressor, wrapping the writer, and push the uncompressed data to it. -/// The writer is assumed to be buffered. -pub fn write<W: Write + Seek>( - buffered_write: W, headers: Headers, compatibility_checks: bool, - write_chunks: impl FnOnce(MetaData, &mut self::writer::ChunkWriter<W>) -> UnitResult -) -> UnitResult { - self::writer::write_chunks_with(buffered_write, headers, compatibility_checks, write_chunks) -} - - - - -/// This iterator tells you the block indices of all blocks that must be in the image. -/// The order of the blocks depends on the `LineOrder` attribute -/// (unspecified line order is treated the same as increasing line order). -/// The blocks written to the file must be exactly in this order, -/// except for when the `LineOrder` is unspecified. -/// The index represents the block index, in increasing line order, within the header. -pub fn enumerate_ordered_header_block_indices(headers: &[Header]) -> impl '_ + Iterator<Item=(usize, BlockIndex)> { - headers.iter().enumerate().flat_map(|(layer_index, header)|{ - header.enumerate_ordered_blocks().map(move |(index_in_header, tile)|{ - let data_indices = header.get_absolute_block_pixel_coordinates(tile.location).expect("tile coordinate bug"); - - let block = BlockIndex { - layer: layer_index, - level: tile.location.level_index, - pixel_position: data_indices.position.to_usize("data indices start").expect("data index bug"), - pixel_size: data_indices.size, - }; - - (index_in_header, block) - }) - }) -} - - -impl UncompressedBlock { - - /// Decompress the possibly compressed chunk and returns an `UncompressedBlock`. - // for uncompressed data, the ByteVec in the chunk is moved all the way - #[inline] - #[must_use] - pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData, pedantic: bool) -> Result<Self> { - let header: &Header = meta_data.headers.get(chunk.layer_index) - .ok_or(Error::invalid("chunk layer index"))?; - - let tile_data_indices = header.get_block_data_indices(&chunk.compressed_block)?; - let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_data_indices)?; - - absolute_indices.validate(Some(header.layer_size))?; - - match chunk.compressed_block { - CompressedBlock::Tile(CompressedTileBlock { compressed_pixels, .. }) | - CompressedBlock::ScanLine(CompressedScanLineBlock { compressed_pixels, .. }) => { - Ok(UncompressedBlock { - data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices, pedantic)?, - index: BlockIndex { - layer: chunk.layer_index, - pixel_position: absolute_indices.position.to_usize("data indices start")?, - level: tile_data_indices.level_index, - pixel_size: absolute_indices.size, - } - }) - }, - - _ => return Err(Error::unsupported("deep data not supported yet")) - } - } - - /// Consume this block by compressing it, returning a `Chunk`. - // for uncompressed data, the ByteVec in the chunk is moved all the way - #[inline] - #[must_use] - pub fn compress_to_chunk(self, headers: &[Header]) -> Result<Chunk> { - let UncompressedBlock { data, index } = self; - - let header: &Header = headers.get(index.layer) - .expect("block layer index bug"); - - let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area(); // TODO sampling?? - if expected_byte_size != data.len() { - panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len()); - } - - let tile_coordinates = TileCoordinates { - // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) - tile_index: index.pixel_position / header.max_block_pixel_size(), // TODO sampling?? - level_index: index.level, - }; - - let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_coordinates)?; - absolute_indices.validate(Some(header.layer_size))?; - - if !header.compression.may_loose_data() { debug_assert_eq!( - &header.compression.decompress_image_section( - header, - header.compression.compress_image_section(header, data.clone(), absolute_indices)?, - absolute_indices, - true - ).unwrap(), - &data, - "compression method not round trippin'" - ); } - - let compressed_data = header.compression.compress_image_section(header, data, absolute_indices)?; - - Ok(Chunk { - layer_index: index.layer, - compressed_block : match header.blocks { - BlockDescription::ScanLines => CompressedBlock::ScanLine(CompressedScanLineBlock { - compressed_pixels: compressed_data, - - // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) - y_coordinate: usize_to_i32(index.pixel_position.y()) + header.own_attributes.layer_position.y(), // TODO sampling?? - }), - - BlockDescription::Tiles(_) => CompressedBlock::Tile(CompressedTileBlock { - compressed_pixels: compressed_data, - coordinates: tile_coordinates, - }), - } - }) - } - - /// Iterate all the lines in this block. - /// Each line contains the all samples for one of the channels. - pub fn lines(&self, channels: &ChannelList) -> impl Iterator<Item=LineRef<'_>> { - LineIndex::lines_in_block(self.index, channels) - .map(move |(bytes, line)| LineSlice { location: line, value: &self.data[bytes] }) - } - - /* TODO pub fn lines_mut<'s>(&'s mut self, header: &Header) -> impl 's + Iterator<Item=LineRefMut<'s>> { - LineIndex::lines_in_block(self.index, &header.channels) - .map(move |(bytes, line)| LineSlice { location: line, value: &mut self.data[bytes] }) - }*/ - - /*// TODO make iterator - /// Call a closure for each line of samples in this uncompressed block. - pub fn for_lines( - &self, header: &Header, - mut accept_line: impl FnMut(LineRef<'_>) -> UnitResult - ) -> UnitResult { - for (bytes, line) in LineIndex::lines_in_block(self.index, &header.channels) { - let line_ref = LineSlice { location: line, value: &self.data[bytes] }; - accept_line(line_ref)?; - } - - Ok(()) - }*/ - - // TODO from iterator?? - /// Create an uncompressed block byte vector by requesting one line of samples after another. - pub fn collect_block_data_from_lines( - channels: &ChannelList, block_index: BlockIndex, - mut extract_line: impl FnMut(LineRefMut<'_>) - ) -> Vec<u8> - { - let byte_count = block_index.pixel_size.area() * channels.bytes_per_pixel; - let mut block_bytes = vec![0_u8; byte_count]; - - for (byte_range, line_index) in LineIndex::lines_in_block(block_index, channels) { - extract_line(LineRefMut { // TODO subsampling - value: &mut block_bytes[byte_range], - location: line_index, - }); - } - - block_bytes - } - - /// Create an uncompressed block by requesting one line of samples after another. - pub fn from_lines( - channels: &ChannelList, block_index: BlockIndex, - extract_line: impl FnMut(LineRefMut<'_>) - ) -> Self { - Self { - index: block_index, - data: Self::collect_block_data_from_lines(channels, block_index, extract_line) - } - } -}
\ No newline at end of file diff --git a/vendor/exr/src/block/reader.rs b/vendor/exr/src/block/reader.rs deleted file mode 100644 index bb9888e..0000000 --- a/vendor/exr/src/block/reader.rs +++ /dev/null @@ -1,527 +0,0 @@ -//! Composable structures to handle reading an image. - - -use std::convert::TryFrom; -use std::fmt::Debug; -use std::io::{Read, Seek}; -use rayon_core::{ThreadPool, ThreadPoolBuildError}; - -use smallvec::alloc::sync::Arc; - -use crate::block::{BlockIndex, UncompressedBlock}; -use crate::block::chunk::{Chunk, TileCoordinates}; -use crate::compression::Compression; -use crate::error::{Error, Result, u64_to_usize, UnitResult}; -use crate::io::{PeekRead, Tracking}; -use crate::meta::{MetaData, OffsetTables}; -use crate::meta::header::Header; - -/// Decode the meta data from a byte source, keeping the source ready for further reading. -/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. -#[derive(Debug)] -pub struct Reader<R> { - meta_data: MetaData, - remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? -} - -impl<R: Read + Seek> Reader<R> { - - /// Start the reading process. - /// Immediately decodes the meta data into an internal field. - /// Access it via`meta_data()`. - pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { - let mut remaining_reader = PeekRead::new(Tracking::new(read)); - let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; - Ok(Self { meta_data, remaining_reader }) - } - - // must not be mutable, as reading the file later on relies on the meta data - /// The decoded exr meta data from the file. - pub fn meta_data(&self) -> &MetaData { &self.meta_data } - - /// The decoded exr meta data from the file. - pub fn headers(&self) -> &[Header] { &self.meta_data.headers } - - /// Obtain the meta data ownership. - pub fn into_meta_data(self) -> MetaData { self.meta_data } - - /// Prepare to read all the chunks from the file. - /// Does not decode the chunks now, but returns a decoder. - /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. - pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { - let total_chunk_count = { - if pedantic { - let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; - validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; - offset_tables.iter().map(|table| table.len()).sum() - } - else { - usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) - .expect("too large chunk count for this machine") - } - }; - - Ok(AllChunksReader { - meta_data: self.meta_data, - remaining_chunks: 0 .. total_chunk_count, - remaining_bytes: self.remaining_reader, - pedantic - }) - } - - /// Prepare to read some the chunks from the file. - /// Does not decode the chunks now, but returns a decoder. - /// Reading only some chunks may seeking the file, potentially skipping many bytes. - // TODO tile indices add no new information to block index?? - pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { - let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; - - // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? - if pedantic { - validate_offset_tables( - self.meta_data.headers.as_slice(), &offset_tables, - self.remaining_reader.byte_position() - )?; - } - - let mut filtered_offsets = Vec::with_capacity( - (self.meta_data.headers.len() * 32).min(2*2048) - ); - - // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied - - for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers - for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order - let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; - - let block = BlockIndex { - layer: header_index, - level: tile.location.level_index, - pixel_position: data_indices.position.to_usize("data indices start")?, - pixel_size: data_indices.size, - }; - - if filter(&self.meta_data, tile.location, block) { - filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` - } - }; - } - - filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) - - if pedantic { - // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. - if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { - return Err(Error::invalid("chunk offset table")) - } - } - - Ok(FilteredChunksReader { - meta_data: self.meta_data, - expected_filtered_chunk_count: filtered_offsets.len(), - remaining_filtered_chunk_indices: filtered_offsets.into_iter(), - remaining_bytes: self.remaining_reader - }) - } -} - - -fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { - let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max - .map(|header| header.max_pixel_file_bytes()) - .sum(); - - // check that each offset is within the bounds - let end_byte = chunks_start_byte + max_pixel_bytes; - let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) - .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); - - if is_invalid { Err(Error::invalid("offset table")) } - else { Ok(()) } -} - - - - -/// Decode the desired chunks and skip the unimportant chunks in the file. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct FilteredChunksReader<R> { - meta_data: MetaData, - expected_filtered_chunk_count: usize, - remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, - remaining_bytes: PeekRead<Tracking<R>>, -} - -/// Decode all chunks in the file without seeking. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct AllChunksReader<R> { - meta_data: MetaData, - remaining_chunks: std::ops::Range<usize>, - remaining_bytes: PeekRead<Tracking<R>>, - pedantic: bool, -} - -/// Decode chunks in the file without seeking. -/// Calls the supplied closure for each chunk. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. -/// Also contains the image meta data. -#[derive(Debug)] -pub struct OnProgressChunksReader<R, F> { - chunks_reader: R, - decoded_chunks: usize, - callback: F, -} - -/// Decode chunks in the file. -/// The decoded chunks can be decompressed by calling -/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. -/// Call `on_progress` to have a callback with each block. -/// Also contains the image meta data. -pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { - - /// The decoded exr meta data from the file. - fn meta_data(&self) -> &MetaData; - - /// The decoded exr headers from the file. - fn headers(&self) -> &[Header] { &self.meta_data().headers } - - /// The number of chunks that this reader will return in total. - /// Can be less than the total number of chunks in the file, if some chunks are skipped. - fn expected_chunk_count(&self) -> usize; - - /// Read the next compressed chunk from the file. - /// Equivalent to `.next()`, as this also is an iterator. - /// Returns `None` if all chunks have been read. - fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } - - /// Create a new reader that calls the provided progress - /// callback for each chunk that is read from the file. - /// If the file can be successfully decoded, - /// the progress will always at least once include 0.0 at the start and 1.0 at the end. - fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { - OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } - } - - /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. - /// The order of the blocks is not deterministic. - /// You can also use `parallel_decompressor` to obtain an iterator instead. - /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. - // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) - fn decompress_parallel( - self, pedantic: bool, - mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult - ) -> UnitResult - { - let mut decompressor = match self.parallel_decompressor(pedantic) { - Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), - Ok(decompressor) => decompressor, - }; - - while let Some(block) = decompressor.next() { - insert_block(decompressor.meta_data(), block?)?; - } - - debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); - Ok(()) - } - - /// Return an iterator that decompresses the chunks with multiple threads. - /// The order of the blocks is not deterministic. - /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. - /// By default, this uses as many threads as there are CPUs. - /// Returns the `self` if there is no need for parallel decompression. - fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { - ParallelBlockDecompressor::new(self, pedantic) - } - - /// Return an iterator that decompresses the chunks in this thread. - /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. - fn decompress_sequential( - self, pedantic: bool, - mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult - ) -> UnitResult - { - let mut decompressor = self.sequential_decompressor(pedantic); - while let Some(block) = decompressor.next() { - insert_block(decompressor.meta_data(), block?)?; - } - - debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); - Ok(()) - } - - /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. - fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { - SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } - } -} - -impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { - fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } - fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } -} - -impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} -impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - self.chunks_reader.next().map(|item|{ - { - let total_chunks = self.expected_chunk_count() as f64; - let callback = &mut self.callback; - callback(self.decoded_chunks as f64 / total_chunks); - } - - self.decoded_chunks += 1; - item - }) - .or_else(||{ - debug_assert_eq!( - self.decoded_chunks, self.expected_chunk_count(), - "chunks reader finished but not all chunks are decompressed" - ); - - let callback = &mut self.callback; - callback(1.0); - None - }) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.chunks_reader.size_hint() - } -} - -impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { - fn meta_data(&self) -> &MetaData { &self.meta_data } - fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } -} - -impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} -impl<R: Read + Seek> Iterator for AllChunksReader<R> { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - // read as many chunks as the file should contain (inferred from meta data) - let next_chunk = self.remaining_chunks.next() - .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); - - // if no chunks are left, but some bytes remain, return error - if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { - return Some(Err(Error::invalid("end of file expected"))); - } - - next_chunk - } - - fn size_hint(&self) -> (usize, Option<usize>) { - (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) - } -} - -impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { - fn meta_data(&self) -> &MetaData { &self.meta_data } - fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } -} - -impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} -impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { - type Item = Result<Chunk>; - - fn next(&mut self) -> Option<Self::Item> { - // read as many chunks as we have desired chunk offsets - self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ - self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts - usize::try_from(next_chunk_location) - .expect("too large chunk position for this machine") - )?; - - let meta_data = &self.meta_data; - Chunk::read(&mut self.remaining_bytes, meta_data) - }) - - // TODO remember last chunk index and then seek to index+size and check whether bytes are left? - } - - fn size_hint(&self) -> (usize, Option<usize>) { - (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) - } -} - -/// Read all chunks from the file, decompressing each chunk immediately. -/// Implements iterator. -#[derive(Debug)] -pub struct SequentialBlockDecompressor<R: ChunksReader> { - remaining_chunks_reader: R, - pedantic: bool, -} - -impl<R: ChunksReader> SequentialBlockDecompressor<R> { - - /// The extracted meta data from the image file. - pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } - - /// Read and then decompress a single block of pixels from the byte source. - pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { - self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ - UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) - }) - } -} - -/// Decompress the chunks in a file in parallel. -/// The first call to `next` will fill the thread pool with jobs, -/// starting to decompress the next few blocks. -/// These jobs will finish, even if you stop reading more blocks. -/// Implements iterator. -#[derive(Debug)] -pub struct ParallelBlockDecompressor<R: ChunksReader> { - remaining_chunks: R, - sender: flume::Sender<Result<UncompressedBlock>>, - receiver: flume::Receiver<Result<UncompressedBlock>>, - currently_decompressing_count: usize, - max_threads: usize, - - shared_meta_data_ref: Arc<MetaData>, - pedantic: bool, - - pool: ThreadPool, -} - -impl<R: ChunksReader> ParallelBlockDecompressor<R> { - - /// Create a new decompressor. Does not immediately spawn any tasks. - /// Decompression starts after the first call to `next`. - /// Returns the chunks if parallel decompression should not be used. - /// Use `new_with_thread_pool` to customize the threadpool. - pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { - Self::new_with_thread_pool(chunks, pedantic, ||{ - rayon_core::ThreadPoolBuilder::new() - .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) - .build() - }) - } - - /// Create a new decompressor. Does not immediately spawn any tasks. - /// Decompression starts after the first call to `next`. - /// Returns the chunks if parallel decompression should not be used. - pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) - -> std::result::Result<Self, R> - where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> - { - // if no compression is used in the file, don't use a threadpool - if chunks.meta_data().headers.iter() - .all(|head|head.compression == Compression::Uncompressed) - { - return Err(chunks); - } - - // in case thread pool creation fails (for example on WASM currently), - // we revert to sequential decompression - let pool = match try_create_thread_pool() { - Ok(pool) => pool, - - // TODO print warning? - Err(_) => return Err(chunks), - }; - - let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times - - let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? - - Ok(Self { - shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), - currently_decompressing_count: 0, - remaining_chunks: chunks, - sender: send, - receiver: recv, - pedantic, - max_threads, - - pool, - }) - } - - /// Fill the pool with decompression jobs. Returns the first job that finishes. - pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { - - while self.currently_decompressing_count < self.max_threads { - let block = self.remaining_chunks.next(); - if let Some(block) = block { - let block = match block { - Ok(block) => block, - Err(error) => return Some(Err(error)) - }; - - let sender = self.sender.clone(); - let meta = self.shared_meta_data_ref.clone(); - let pedantic = self.pedantic; - - self.currently_decompressing_count += 1; - - self.pool.spawn(move || { - let decompressed_or_err = UncompressedBlock::decompress_chunk( - block, &meta, pedantic - ); - - // by now, decompressing could have failed in another thread. - // the error is then already handled, so we simply - // don't send the decompressed block and do nothing - let _ = sender.send(decompressed_or_err); - }); - } - else { - // there are no chunks left to decompress - break; - } - } - - if self.currently_decompressing_count > 0 { - let next = self.receiver.recv() - .expect("all decompressing senders hung up but more messages were expected"); - - self.currently_decompressing_count -= 1; - Some(next) - } - else { - debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable - debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); - None - } - } - - /// The extracted meta data of the image file. - pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } -} - -impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} -impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { - type Item = Result<UncompressedBlock>; - fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } - fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } -} - -impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} -impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { - type Item = Result<UncompressedBlock>; - fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } - fn size_hint(&self) -> (usize, Option<usize>) { - let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; - (remaining, Some(remaining)) - } -} - - - - - diff --git a/vendor/exr/src/block/samples.rs b/vendor/exr/src/block/samples.rs deleted file mode 100644 index 4352b11..0000000 --- a/vendor/exr/src/block/samples.rs +++ /dev/null @@ -1,248 +0,0 @@ -//! Extract pixel samples from a block of pixel bytes. - -use crate::prelude::*; -use half::prelude::HalfFloatSliceExt; - - -/// A single red, green, blue, or alpha value. -#[derive(Copy, Clone, Debug)] -pub enum Sample { - - /// A 16-bit float sample. - F16(f16), - - /// A 32-bit float sample. - F32(f32), - - /// An unsigned integer sample. - U32(u32) -} - -impl Sample { - - /// Create a sample containing a 32-bit float. - pub fn f32(f32: f32) -> Self { Sample::F32(f32) } - - /// Create a sample containing a 16-bit float. - pub fn f16(f16: f16) -> Self { Sample::F16(f16) } - - /// Create a sample containing a 32-bit integer. - pub fn u32(u32: u32) -> Self { Sample::U32(u32) } - - /// Convert the sample to an f16 value. This has lower precision than f32. - /// Note: An f32 can only represent integers up to `1024` as precise as a u32 could. - #[inline] - pub fn to_f16(self) -> f16 { - match self { - Sample::F16(sample) => sample, - Sample::F32(sample) => f16::from_f32(sample), - Sample::U32(sample) => f16::from_f32(sample as f32), - } - } - - /// Convert the sample to an f32 value. - /// Note: An f32 can only represent integers up to `8388608` as precise as a u32 could. - #[inline] - pub fn to_f32(self) -> f32 { - match self { - Sample::F32(sample) => sample, - Sample::F16(sample) => sample.to_f32(), - Sample::U32(sample) => sample as f32, - } - } - - /// Convert the sample to a u32. Rounds floats to integers the same way that `3.1 as u32` does. - #[inline] - pub fn to_u32(self) -> u32 { - match self { - Sample::F16(sample) => sample.to_f32() as u32, - Sample::F32(sample) => sample as u32, - Sample::U32(sample) => sample, - } - } - - /// Is this value not a number? - #[inline] - pub fn is_nan(self) -> bool { - match self { - Sample::F16(value) => value.is_nan(), - Sample::F32(value) => value.is_nan(), - Sample::U32(_) => false, - } - } - - /// Is this value zero or negative zero? - #[inline] - pub fn is_zero(&self) -> bool { - match *self { - Sample::F16(value) => value == f16::ZERO || value == f16::NEG_ZERO, - Sample::F32(value) => value == 0.0, - Sample::U32(value) => value == 0, - } - } -} - -impl PartialEq for Sample { - fn eq(&self, other: &Self) -> bool { - match *self { - Sample::F16(num) => num == other.to_f16(), - Sample::F32(num) => num == other.to_f32(), - Sample::U32(num) => num == other.to_u32(), - } - } -} - -// this is not recommended because it may hide whether a color is transparent or opaque and might be undesired for depth channels -impl Default for Sample { - fn default() -> Self { Sample::F32(0.0) } -} - -impl From<f16> for Sample { #[inline] fn from(f: f16) -> Self { Sample::F16(f) } } -impl From<f32> for Sample { #[inline] fn from(f: f32) -> Self { Sample::F32(f) } } -impl From<u32> for Sample { #[inline] fn from(f: u32) -> Self { Sample::U32(f) } } - -impl<T> From<Option<T>> for Sample where T: Into<Sample> + Default { - #[inline] fn from(num: Option<T>) -> Self { num.unwrap_or_default().into() } -} - - -impl From<Sample> for f16 { #[inline] fn from(s: Sample) -> Self { s.to_f16() } } -impl From<Sample> for f32 { #[inline] fn from(s: Sample) -> Self { s.to_f32() } } -impl From<Sample> for u32 { #[inline] fn from(s: Sample) -> Self { s.to_u32() } } - - -/// Create an arbitrary sample type from one of the defined sample types. -/// Should be compiled to a no-op where the file contains the predicted sample type. -/// The slice functions should be optimized into a `memcpy` where there is no conversion needed. -pub trait FromNativeSample: Sized + Copy + Default + 'static { - - /// Create this sample from a f16, trying to represent the same numerical value - fn from_f16(value: f16) -> Self; - - /// Create this sample from a f32, trying to represent the same numerical value - fn from_f32(value: f32) -> Self; - - /// Create this sample from a u32, trying to represent the same numerical value - fn from_u32(value: u32) -> Self; - - /// Convert all values from the slice into this type. - /// This function exists to allow the compiler to perform a vectorization optimization. - /// Note that this default implementation will **not** be vectorized by the compiler automatically. - /// For maximum performance you will need to override this function and implement it via - /// an explicit batched conversion such as [`convert_to_f32_slice`](https://docs.rs/half/2.3.1/half/slice/trait.HalfFloatSliceExt.html#tymethod.convert_to_f32_slice) - #[inline] - fn from_f16s(from: &[f16], to: &mut [Self]) { - assert_eq!(from.len(), to.len(), "slices must have the same length"); - for (from, to) in from.iter().zip(to.iter_mut()) { - *to = Self::from_f16(*from); - } - } - - /// Convert all values from the slice into this type. - /// This function exists to allow the compiler to perform a vectorization optimization. - /// Note that this default implementation will be vectorized by the compiler automatically. - #[inline] - fn from_f32s(from: &[f32], to: &mut [Self]) { - assert_eq!(from.len(), to.len(), "slices must have the same length"); - for (from, to) in from.iter().zip(to.iter_mut()) { - *to = Self::from_f32(*from); - } - } - - /// Convert all values from the slice into this type. - /// This function exists to allow the compiler to perform a vectorization optimization. - /// Note that this default implementation will be vectorized by the compiler automatically, - /// provided that the CPU supports the necessary conversion instructions. - /// For example, x86_64 lacks the instructions to convert `u32` to floats, - /// so this will inevitably be slow on x86_64. - #[inline] - fn from_u32s(from: &[u32], to: &mut [Self]) { - assert_eq!(from.len(), to.len(), "slices must have the same length"); - for (from, to) in from.iter().zip(to.iter_mut()) { - *to = Self::from_u32(*from); - } - } -} - -// TODO haven't i implemented this exact behaviour already somewhere else in this library...?? -impl FromNativeSample for f32 { - #[inline] fn from_f16(value: f16) -> Self { value.to_f32() } - #[inline] fn from_f32(value: f32) -> Self { value } - #[inline] fn from_u32(value: u32) -> Self { value as f32 } - - // f16 is a custom type - // so the compiler can not automatically vectorize the conversion - // that's why we need to specialize this function - #[inline] - fn from_f16s(from: &[f16], to: &mut [Self]) { - from.convert_to_f32_slice(to); - } -} - -impl FromNativeSample for u32 { - #[inline] fn from_f16(value: f16) -> Self { value.to_f32() as u32 } - #[inline] fn from_f32(value: f32) -> Self { value as u32 } - #[inline] fn from_u32(value: u32) -> Self { value } -} - -impl FromNativeSample for f16 { - #[inline] fn from_f16(value: f16) -> Self { value } - #[inline] fn from_f32(value: f32) -> Self { f16::from_f32(value) } - #[inline] fn from_u32(value: u32) -> Self { f16::from_f32(value as f32) } - - // f16 is a custom type - // so the compiler can not automatically vectorize the conversion - // that's why we need to specialize this function - #[inline] - fn from_f32s(from: &[f32], to: &mut [Self]) { - to.convert_from_f32_slice(from) - } -} - -impl FromNativeSample for Sample { - #[inline] fn from_f16(value: f16) -> Self { Self::from(value) } - #[inline] fn from_f32(value: f32) -> Self { Self::from(value) } - #[inline] fn from_u32(value: u32) -> Self { Self::from(value) } -} - - -/// Convert any type into one of the supported sample types. -/// Should be compiled to a no-op where the file contains the predicted sample type -pub trait IntoNativeSample: Copy + Default + Sync + 'static { - - /// Convert this sample to an f16, trying to represent the same numerical value. - fn to_f16(&self) -> f16; - - /// Convert this sample to an f32, trying to represent the same numerical value. - fn to_f32(&self) -> f32; - - /// Convert this sample to an u16, trying to represent the same numerical value. - fn to_u32(&self) -> u32; -} - -impl IntoNativeSample for f16 { - fn to_f16(&self) -> f16 { f16::from_f16(*self) } - fn to_f32(&self) -> f32 { f32::from_f16(*self) } - fn to_u32(&self) -> u32 { u32::from_f16(*self) } -} - -impl IntoNativeSample for f32 { - fn to_f16(&self) -> f16 { f16::from_f32(*self) } - fn to_f32(&self) -> f32 { f32::from_f32(*self) } - fn to_u32(&self) -> u32 { u32::from_f32(*self) } -} - -impl IntoNativeSample for u32 { - fn to_f16(&self) -> f16 { f16::from_u32(*self) } - fn to_f32(&self) -> f32 { f32::from_u32(*self) } - fn to_u32(&self) -> u32 { u32::from_u32(*self) } -} - -impl IntoNativeSample for Sample { - fn to_f16(&self) -> f16 { Sample::to_f16(*self) } - fn to_f32(&self) -> f32 { Sample::to_f32(*self) } - fn to_u32(&self) -> u32 { Sample::to_u32(*self) } -} - - - diff --git a/vendor/exr/src/block/writer.rs b/vendor/exr/src/block/writer.rs deleted file mode 100644 index 1227c69..0000000 --- a/vendor/exr/src/block/writer.rs +++ /dev/null @@ -1,468 +0,0 @@ -//! Composable structures to handle writing an image. - - -use std::fmt::Debug; -use std::io::Seek; -use std::iter::Peekable; -use std::ops::Not; -use rayon_core::{ThreadPool, ThreadPoolBuildError}; - -use smallvec::alloc::collections::BTreeMap; - -use crate::block::UncompressedBlock; -use crate::block::chunk::{Chunk}; -use crate::compression::Compression; -use crate::error::{Error, Result, UnitResult, usize_to_u64}; -use crate::io::{Data, Tracking, Write}; -use crate::meta::{Headers, MetaData, OffsetTables}; -use crate::meta::attribute::LineOrder; - -/// Write an exr file by writing one chunk after another in a closure. -/// In the closure, you are provided a chunk writer, which should be used to write all the chunks. -/// Assumes the your write destination is buffered. -pub fn write_chunks_with<W: Write + Seek>( - buffered_write: W, headers: Headers, pedantic: bool, - write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult -) -> UnitResult { - // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed - let (meta, mut writer) = ChunkWriter::new_for_buffered(buffered_write, headers, pedantic)?; - write_chunks(meta, &mut writer)?; - writer.complete_meta_data() -} - -/// Can consume compressed pixel chunks, writing them a file. -/// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data, -/// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`. -/// Use `on_progress` to obtain a new writer -/// that triggers a callback for each block. -// #[must_use] -#[derive(Debug)] -#[must_use] -pub struct ChunkWriter<W> { - header_count: usize, - byte_writer: Tracking<W>, - chunk_indices_byte_location: std::ops::Range<usize>, - chunk_indices_increasing_y: OffsetTables, - chunk_count: usize, // TODO compose? -} - -/// A new writer that triggers a callback -/// for each block written to the inner writer. -#[derive(Debug)] -#[must_use] -pub struct OnProgressChunkWriter<'w, W, F> { - chunk_writer: &'w mut W, - written_chunks: usize, - on_progress: F, -} - -/// Write chunks to a byte destination. -/// Then write each chunk with `writer.write_chunk(chunk)`. -pub trait ChunksWriter: Sized { - - /// The total number of chunks that the complete file will contain. - fn total_chunks_count(&self) -> usize; - - /// Any more calls will result in an error and have no effect. - /// If writing results in an error, the file and the writer - /// may remain in an invalid state and should not be used further. - /// Errors when the chunk at this index was already written. - fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult; - - /// Obtain a new writer that calls the specified closure for each block that is written to this writer. - fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) { - OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress } - } - - /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer. - fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> { - SequentialBlocksCompressor::new(meta, self) - } - - /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer. - /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead). - fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> { - ParallelBlocksCompressor::new(meta, self) - } - - /// Compresses all blocks to the file. - /// The index of the block must be in increasing line order within the header. - /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. - fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { - let mut writer = self.sequential_blocks_compressor(meta); - - // TODO check block order if line order is not unspecified! - for (index_in_header_increasing_y, block) in blocks { - writer.compress_block(index_in_header_increasing_y, block)?; - } - - // TODO debug_assert_eq!(self.is_complete()); - Ok(()) - } - - /// Compresses all blocks to the file. - /// The index of the block must be in increasing line order within the header. - /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. - /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. - fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { - let mut parallel_writer = match self.parallel_blocks_compressor(meta) { - None => return self.compress_all_blocks_sequential(meta, blocks), - Some(writer) => writer, - }; - - // TODO check block order if line order is not unspecified! - for (index_in_header_increasing_y, block) in blocks { - parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?; - } - - // TODO debug_assert_eq!(self.is_complete()); - Ok(()) - } -} - - -impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek { - - /// The total number of chunks that the complete file will contain. - fn total_chunks_count(&self) -> usize { self.chunk_count } - - /// Any more calls will result in an error and have no effect. - /// If writing results in an error, the file and the writer - /// may remain in an invalid state and should not be used further. - /// Errors when the chunk at this index was already written. - fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { - let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index]; - - if index_in_header_increasing_y >= header_chunk_indices.len() { - return Err(Error::invalid("too large chunk index")); - } - - let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y]; - if *chunk_index_slot != 0 { - return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y))); - } - - *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position()); - chunk.write(&mut self.byte_writer, self.header_count)?; - Ok(()) - } -} - -impl<W> ChunkWriter<W> where W: Write + Seek { - // -- the following functions are private, because they must be called in a strict order -- - - /// Writes the meta data and zeroed offset tables as a placeholder. - fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> { - let mut write = Tracking::new(buffered_byte_writer); - let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?; - - // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now - /*// if non-parallel compression, we always use increasing order anyways - if !parallel || !has_compression { - for header in &mut headers { - if header.line_order == LineOrder::Unspecified { - header.line_order = LineOrder::Increasing; - } - } - }*/ - - let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum(); - - let offset_table_start_byte = write.byte_position(); - let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE; - - // skip offset tables, filling with 0, will be updated after the last chunk has been written - write.seek_write_to(offset_table_end_byte)?; - - let header_count = headers.len(); - let chunk_indices_increasing_y = headers.iter() - .map(|header| vec![0_u64; header.chunk_count]).collect(); - - let meta_data = MetaData { requirements, headers }; - - Ok((meta_data, ChunkWriter { - header_count, - byte_writer: write, - chunk_count: offset_table_size, - chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte, - chunk_indices_increasing_y, - })) - } - - /// Seek back to the meta data, write offset tables, and flush the byte writer. - /// Leaves the writer seeked to the middle of the file. - fn complete_meta_data(mut self) -> UnitResult { - if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) { - return Err(Error::invalid("some chunks are not written yet")) - } - - // write all offset tables - debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated"); - self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?; - - for table in self.chunk_indices_increasing_y { - u64::write_slice(&mut self.byte_writer, table.as_slice())?; - } - - self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning - Ok(()) - } - -} - - -impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) { - fn total_chunks_count(&self) -> usize { - self.chunk_writer.total_chunks_count() - } - - fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { - let total_chunks = self.total_chunks_count(); - let on_progress = &mut self.on_progress; - - // guarantee on_progress being called with 0 once - if self.written_chunks == 0 { on_progress(0.0); } - - self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?; - - self.written_chunks += 1; - - on_progress({ - // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0 - if self.written_chunks == total_chunks { 1.0 } - else { self.written_chunks as f64 / total_chunks as f64 } - }); - - Ok(()) - } -} - - -/// Write blocks that appear in any order and reorder them before writing. -#[derive(Debug)] -#[must_use] -pub struct SortedBlocksWriter<'w, W> { - chunk_writer: &'w mut W, - pending_chunks: BTreeMap<usize, (usize, Chunk)>, - unwritten_chunk_indices: Peekable<std::ops::Range<usize>>, - requires_sorting: bool, // using this instead of Option, because of borrowing -} - - -impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter { - - /// New sorting writer. Returns `None` if sorting is not required. - pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> { - let requires_sorting = meta_data.headers.iter() - .any(|header| header.line_order != LineOrder::Unspecified); - - let total_chunk_count = chunk_writer.total_chunks_count(); - - SortedBlocksWriter { - pending_chunks: BTreeMap::new(), - unwritten_chunk_indices: (0 .. total_chunk_count).peekable(), - requires_sorting, - chunk_writer - } - } - - /// Write the chunk or stash it. In the closure, write all chunks that can be written now. - pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult { - if self.requires_sorting.not() { - return self.chunk_writer.write_chunk(chunk_y_index, chunk); - } - - // write this chunk now if possible - if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){ - self.chunk_writer.write_chunk(chunk_y_index, chunk)?; - self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); - - // write all pending blocks that are immediate successors of this block - while let Some((next_chunk_y_index, next_chunk)) = self - .unwritten_chunk_indices.peek().cloned() - .and_then(|id| self.pending_chunks.remove(&id)) - { - self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?; - self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); - } - } - - else { - // the argument block is not to be written now, - // and all the pending blocks are not next up either, - // so just stash this block - self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk)); - } - - Ok(()) - } - - /// Where the chunks will be written to. - pub fn inner_chunks_writer(&self) -> &W { - &self.chunk_writer - } -} - - - -/// Compress blocks to a chunk writer in this thread. -#[derive(Debug)] -#[must_use] -pub struct SequentialBlocksCompressor<'w, W> { - meta: &'w MetaData, - chunks_writer: &'w mut W, -} - -impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter { - - /// New blocks writer. - pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } } - - /// This is where the compressed blocks are written to. - pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer } - - /// Compress a single block immediately. The index of the block must be in increasing line order. - pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { - self.chunks_writer.write_chunk( - index_in_header_increasing_y, - block.compress_to_chunk(&self.meta.headers)? - ) - } -} - -/// Compress blocks to a chunk writer with multiple threads. -#[derive(Debug)] -#[must_use] -pub struct ParallelBlocksCompressor<'w, W> { - meta: &'w MetaData, - sorted_writer: SortedBlocksWriter<'w, W>, - - sender: flume::Sender<Result<(usize, usize, Chunk)>>, - receiver: flume::Receiver<Result<(usize, usize, Chunk)>>, - pool: rayon_core::ThreadPool, - - currently_compressing_count: usize, - written_chunk_count: usize, // used to check for last chunk - max_threads: usize, - next_incoming_chunk_index: usize, // used to remember original chunk order -} - -impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter { - - /// New blocks writer. Returns none if sequential compression should be used. - /// Use `new_with_thread_pool` to customize the threadpool. - pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> { - Self::new_with_thread_pool(meta, chunks_writer, ||{ - rayon_core::ThreadPoolBuilder::new() - .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index)) - .build() - }) - } - - /// New blocks writer. Returns none if sequential compression should be used. - pub fn new_with_thread_pool<CreatePool>( - meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool) - -> Option<Self> - where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> - { - if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) { - return None; - } - - // in case thread pool creation fails (for example on WASM currently), - // we revert to sequential compression - let pool = match try_create_thread_pool() { - Ok(pool) => pool, - - // TODO print warning? - Err(_) => return None, - }; - - let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times - let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? - - Some(Self { - sorted_writer: SortedBlocksWriter::new(meta, chunks_writer), - next_incoming_chunk_index: 0, - currently_compressing_count: 0, - written_chunk_count: 0, - sender: send, - receiver: recv, - max_threads, - pool, - meta, - }) - } - - /// This is where the compressed blocks are written to. - pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() } - - // private, as may underflow counter in release mode - fn write_next_queued_chunk(&mut self) -> UnitResult { - debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left"); - - let some_compressed_chunk = self.receiver.recv() - .expect("cannot receive compressed block"); - - self.currently_compressing_count -= 1; - let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?; - self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?; - - self.written_chunk_count += 1; - Ok(()) - } - - /// Wait until all currently compressing chunks in the compressor have been written. - pub fn write_all_queued_chunks(&mut self) -> UnitResult { - while self.currently_compressing_count > 0 { - self.write_next_queued_chunk()?; - } - - debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count"); - Ok(()) - } - - /// Add a single block to the compressor queue. The index of the block must be in increasing line order. - /// When calling this function for the last block, this method waits until all the blocks have been written. - /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`. - /// Waits for a block from the queue to be written, if the queue already has enough items. - pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { - - // if pipe is full, block to wait for a slot to free up - if self.currently_compressing_count >= self.max_threads { - self.write_next_queued_chunk()?; - } - - // add the argument chunk to the compression queueue - let index_in_file = self.next_incoming_chunk_index; - let sender = self.sender.clone(); - let meta = self.meta.clone(); - - self.pool.spawn(move ||{ - let compressed_or_err = block.compress_to_chunk(&meta.headers); - - // by now, decompressing could have failed in another thread. - // the error is then already handled, so we simply - // don't send the decompressed block and do nothing - let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed))); - }); - - self.currently_compressing_count += 1; - self.next_incoming_chunk_index += 1; - - // if this is the last chunk, wait for all chunks to complete before returning - if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() { - self.write_all_queued_chunks()?; - debug_assert_eq!( - self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(), - "written chunk count mismatch" - ); - } - - - Ok(()) - } -} - - - |