diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/exr/src/block | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/exr/src/block')
-rw-r--r-- | vendor/exr/src/block/chunk.rs | 379 | ||||
-rw-r--r-- | vendor/exr/src/block/lines.rs | 197 | ||||
-rw-r--r-- | vendor/exr/src/block/mod.rs | 257 | ||||
-rw-r--r-- | vendor/exr/src/block/reader.rs | 527 | ||||
-rw-r--r-- | vendor/exr/src/block/samples.rs | 248 | ||||
-rw-r--r-- | vendor/exr/src/block/writer.rs | 468 |
6 files changed, 2076 insertions, 0 deletions
diff --git a/vendor/exr/src/block/chunk.rs b/vendor/exr/src/block/chunk.rs new file mode 100644 index 0000000..ff138f8 --- /dev/null +++ b/vendor/exr/src/block/chunk.rs @@ -0,0 +1,379 @@ + +//! Read and write already compressed pixel data blocks. +//! Does not include the process of compression and decompression. + +use crate::meta::attribute::{IntegerBounds}; + +/// A generic block of pixel information. +/// Contains pixel data and an index to the corresponding header. +/// All pixel data in a file is split into a list of chunks. +/// Also contains positioning information that locates this +/// data block in the referenced layer. +#[derive(Debug, Clone)] +pub struct Chunk { + + /// The index of the layer that the block belongs to. + /// This is required as the pixel data can appear in any order in a file. + // PDF says u64, but source code seems to be i32 + pub layer_index: usize, + + /// The compressed pixel contents. + pub compressed_block: CompressedBlock, +} + +/// The raw, possibly compressed pixel data of a file. +/// Each layer in a file can have a different type. +/// Also contains positioning information that locates this +/// data block in the corresponding layer. +/// Exists inside a `Chunk`. +#[derive(Debug, Clone)] +pub enum CompressedBlock { + + /// Scan line blocks of flat data. + ScanLine(CompressedScanLineBlock), + + /// Tiles of flat data. + Tile(CompressedTileBlock), + + /// Scan line blocks of deep data. + DeepScanLine(CompressedDeepScanLineBlock), + + /// Tiles of deep data. + DeepTile(CompressedDeepTileBlock), +} + +/// A `Block` of possibly compressed flat scan lines. +/// Corresponds to type attribute `scanlineimage`. +#[derive(Debug, Clone)] +pub struct CompressedScanLineBlock { + + /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. + /// The top scan line block in the image is aligned with the top edge of the data window. + pub y_coordinate: i32, + + /// One or more scan lines may be stored together as a scan line block. + /// The number of scan lines per block depends on how the pixel data are compressed. + /// For each line in the tile, for each channel, the row values are contiguous. + pub compressed_pixels: Vec<u8>, +} + +/// This `Block` is a tile of flat (non-deep) data. +/// Corresponds to type attribute `tiledimage`. +#[derive(Debug, Clone)] +pub struct CompressedTileBlock { + + /// The tile location. + pub coordinates: TileCoordinates, + + /// One or more scan lines may be stored together as a scan line block. + /// The number of scan lines per block depends on how the pixel data are compressed. + /// For each line in the tile, for each channel, the row values are contiguous. + pub compressed_pixels: Vec<u8>, +} + +/// Indicates the position and resolution level of a `TileBlock` or `DeepTileBlock`. +#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] +pub struct TileCoordinates { + + /// Index of the tile, not pixel position. + pub tile_index: Vec2<usize>, + + /// Index of the Mip/Rip level. + pub level_index: Vec2<usize>, +} + +/// This `Block` consists of one or more deep scan lines. +/// Corresponds to type attribute `deepscanline`. +#[derive(Debug, Clone)] +pub struct CompressedDeepScanLineBlock { + + /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. + /// The top scan line block in the image is aligned with the top edge of the data window. + pub y_coordinate: i32, + + /// Count of samples. + pub decompressed_sample_data_size: usize, + + /// The pixel offset table is a list of integers, one for each pixel column within the data window. + /// Each entry in the table indicates the total number of samples required + /// to store the pixel in it as well as all pixels to the left of it. + pub compressed_pixel_offset_table: Vec<i8>, + + /// One or more scan lines may be stored together as a scan line block. + /// The number of scan lines per block depends on how the pixel data are compressed. + /// For each line in the tile, for each channel, the row values are contiguous. + pub compressed_sample_data: Vec<u8>, +} + +/// This `Block` is a tile of deep data. +/// Corresponds to type attribute `deeptile`. +#[derive(Debug, Clone)] +pub struct CompressedDeepTileBlock { + + /// The tile location. + pub coordinates: TileCoordinates, + + /// Count of samples. + pub decompressed_sample_data_size: usize, + + /// The pixel offset table is a list of integers, one for each pixel column within the data window. + /// Each entry in the table indicates the total number of samples required + /// to store the pixel in it as well as all pixels to the left of it. + pub compressed_pixel_offset_table: Vec<i8>, + + /// One or more scan lines may be stored together as a scan line block. + /// The number of scan lines per block depends on how the pixel data are compressed. + /// For each line in the tile, for each channel, the row values are contiguous. + pub compressed_sample_data: Vec<u8>, +} + + +use crate::io::*; + +impl TileCoordinates { + + /// Without validation, write this instance to the byte stream. + pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { + i32::write(usize_to_i32(self.tile_index.x()), write)?; + i32::write(usize_to_i32(self.tile_index.y()), write)?; + i32::write(usize_to_i32(self.level_index.x()), write)?; + i32::write(usize_to_i32(self.level_index.y()), write)?; + Ok(()) + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read) -> Result<Self> { + let tile_x = i32::read(read)?; + let tile_y = i32::read(read)?; + + let level_x = i32::read(read)?; + let level_y = i32::read(read)?; + + if level_x > 31 || level_y > 31 { + // there can be at most 31 levels, because the largest level would have a size of 2^31, + // which exceeds the maximum 32-bit integer value. + return Err(Error::invalid("level index exceeding integer maximum")); + } + + Ok(TileCoordinates { + tile_index: Vec2(tile_x, tile_y).to_usize("tile coordinate index")?, + level_index: Vec2(level_x, level_y).to_usize("tile coordinate level")? + }) + } + + /// The indices which can be used to index into the arrays of a data window. + /// These coordinates are only valid inside the corresponding one header. + /// Will start at 0 and always be positive. + pub fn to_data_indices(&self, tile_size: Vec2<usize>, max: Vec2<usize>) -> Result<IntegerBounds> { + let x = self.tile_index.x() * tile_size.width(); + let y = self.tile_index.y() * tile_size.height(); + + if x >= max.x() || y >= max.y() { + Err(Error::invalid("tile index")) + } + else { + Ok(IntegerBounds { + position: Vec2(usize_to_i32(x), usize_to_i32(y)), + size: Vec2( + calculate_block_size(max.x(), tile_size.width(), x)?, + calculate_block_size(max.y(), tile_size.height(), y)?, + ), + }) + } + } + + /// Absolute coordinates inside the global 2D space of a file, may be negative. + pub fn to_absolute_indices(&self, tile_size: Vec2<usize>, data_window: IntegerBounds) -> Result<IntegerBounds> { + let data = self.to_data_indices(tile_size, data_window.size)?; + Ok(data.with_origin(data_window.position)) + } + + /// Returns if this is the original resolution or a smaller copy. + pub fn is_largest_resolution_level(&self) -> bool { + self.level_index == Vec2(0, 0) + } +} + + + +use crate::meta::{MetaData, BlockDescription, calculate_block_size}; + +impl CompressedScanLineBlock { + + /// Without validation, write this instance to the byte stream. + pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { + debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); + + i32::write(self.y_coordinate, write)?; + u8::write_i32_sized_slice(write, &self.compressed_pixels)?; + Ok(()) + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { + let y_coordinate = i32::read(read)?; + let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "scan line block sample count")?; + Ok(CompressedScanLineBlock { y_coordinate, compressed_pixels }) + } +} + +impl CompressedTileBlock { + + /// Without validation, write this instance to the byte stream. + pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { + debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); + + self.coordinates.write(write)?; + u8::write_i32_sized_slice(write, &self.compressed_pixels)?; + Ok(()) + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { + let coordinates = TileCoordinates::read(read)?; + let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "tile block sample count")?; + Ok(CompressedTileBlock { coordinates, compressed_pixels }) + } +} + +impl CompressedDeepScanLineBlock { + + /// Without validation, write this instance to the byte stream. + pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { + debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); + + i32::write(self.y_coordinate, write)?; + u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; + u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed + u64::write(self.decompressed_sample_data_size as u64, write)?; + i8::write_slice(write, &self.compressed_pixel_offset_table)?; + u8::write_slice(write, &self.compressed_sample_data)?; + Ok(()) + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { + let y_coordinate = i32::read(read)?; + let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); + let compressed_sample_data_size = u64_to_usize(u64::read(read)?); + let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); + + // doc said i32, try u8 + let compressed_pixel_offset_table = i8::read_vec( + read, compressed_pixel_offset_table_size, + 6 * u16::MAX as usize, Some(max_block_byte_size), + "deep scan line block table size" + )?; + + let compressed_sample_data = u8::read_vec( + read, compressed_sample_data_size, + 6 * u16::MAX as usize, Some(max_block_byte_size), + "deep scan line block sample count" + )?; + + Ok(CompressedDeepScanLineBlock { + y_coordinate, + decompressed_sample_data_size, + compressed_pixel_offset_table, + compressed_sample_data, + }) + } +} + + +impl CompressedDeepTileBlock { + + /// Without validation, write this instance to the byte stream. + pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { + debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); + + self.coordinates.write(write)?; + u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; + u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed + u64::write(self.decompressed_sample_data_size as u64, write)?; + i8::write_slice(write, &self.compressed_pixel_offset_table)?; + u8::write_slice(write, &self.compressed_sample_data)?; + Ok(()) + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read, hard_max_block_byte_size: usize) -> Result<Self> { + let coordinates = TileCoordinates::read(read)?; + let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); + let compressed_sample_data_size = u64_to_usize(u64::read(read)?); // TODO u64 just guessed + let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); + + let compressed_pixel_offset_table = i8::read_vec( + read, compressed_pixel_offset_table_size, + 6 * u16::MAX as usize, Some(hard_max_block_byte_size), + "deep tile block table size" + )?; + + let compressed_sample_data = u8::read_vec( + read, compressed_sample_data_size, + 6 * u16::MAX as usize, Some(hard_max_block_byte_size), + "deep tile block sample count" + )?; + + Ok(CompressedDeepTileBlock { + coordinates, + decompressed_sample_data_size, + compressed_pixel_offset_table, + compressed_sample_data, + }) + } +} + +use crate::error::{UnitResult, Result, Error, u64_to_usize, usize_to_i32, i32_to_usize}; +use crate::math::Vec2; + +/// Validation of chunks is done while reading and writing the actual data. (For example in exr::full_image) +impl Chunk { + + /// Without validation, write this instance to the byte stream. + pub fn write(&self, write: &mut impl Write, header_count: usize) -> UnitResult { + debug_assert!(self.layer_index < header_count, "layer index bug"); // validation is done in full_image or simple_image + + if header_count != 1 { usize_to_i32(self.layer_index).write(write)?; } + else { assert_eq!(self.layer_index, 0, "invalid header index for single layer file"); } + + match self.compressed_block { + CompressedBlock::ScanLine (ref value) => value.write(write), + CompressedBlock::Tile (ref value) => value.write(write), + CompressedBlock::DeepScanLine (ref value) => value.write(write), + CompressedBlock::DeepTile (ref value) => value.write(write), + } + } + + /// Read the value without validating. + pub fn read(read: &mut impl Read, meta_data: &MetaData) -> Result<Self> { + let layer_number = i32_to_usize( + if meta_data.requirements.is_multilayer() { i32::read(read)? } // documentation says u64, but is i32 + else { 0_i32 }, // reference the first header for single-layer images + "chunk data part number" + )?; + + if layer_number >= meta_data.headers.len() { + return Err(Error::invalid("chunk data part number")); + } + + let header = &meta_data.headers[layer_number]; + let max_block_byte_size = header.max_block_byte_size(); + + let chunk = Chunk { + layer_index: layer_number, + compressed_block: match header.blocks { + // flat data + BlockDescription::ScanLines if !header.deep => CompressedBlock::ScanLine(CompressedScanLineBlock::read(read, max_block_byte_size)?), + BlockDescription::Tiles(_) if !header.deep => CompressedBlock::Tile(CompressedTileBlock::read(read, max_block_byte_size)?), + + // deep data + BlockDescription::ScanLines => CompressedBlock::DeepScanLine(CompressedDeepScanLineBlock::read(read, max_block_byte_size)?), + BlockDescription::Tiles(_) => CompressedBlock::DeepTile(CompressedDeepTileBlock::read(read, max_block_byte_size)?), + }, + }; + + Ok(chunk) + } +} + diff --git a/vendor/exr/src/block/lines.rs b/vendor/exr/src/block/lines.rs new file mode 100644 index 0000000..1cdf8ee --- /dev/null +++ b/vendor/exr/src/block/lines.rs @@ -0,0 +1,197 @@ +//! Extract lines from a block of pixel bytes. + +use crate::math::*; +use std::io::{Cursor}; +use crate::error::{Result, UnitResult}; +use smallvec::SmallVec; +use std::ops::Range; +use crate::block::{BlockIndex}; +use crate::meta::attribute::ChannelList; + + +/// A single line of pixels. +/// Use [LineRef] or [LineRefMut] for easier type names. +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub struct LineSlice<T> { + + // TODO also store enum SampleType, as it would always be matched in every place it is used + + /// Where this line is located inside the image. + pub location: LineIndex, + + /// The raw bytes of the pixel line, either `&[u8]` or `&mut [u8]`. + /// Must be re-interpreted as slice of f16, f32, or u32, + /// according to the channel data type. + pub value: T, +} + + +/// An reference to a single line of pixels. +/// May go across the whole image or just a tile section of it. +/// +/// This line contains an immutable slice that all samples will be read from. +pub type LineRef<'s> = LineSlice<&'s [u8]>; + +/// A reference to a single mutable line of pixels. +/// May go across the whole image or just a tile section of it. +/// +/// This line contains a mutable slice that all samples will be written to. +pub type LineRefMut<'s> = LineSlice<&'s mut [u8]>; + + +/// Specifies where a row of pixels lies inside an image. +/// This is a globally unique identifier which includes +/// the layer, channel index, and pixel location. +#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)] +pub struct LineIndex { + + /// Index of the layer. + pub layer: usize, + + /// The channel index of the layer. + pub channel: usize, + + /// Index of the mip or rip level in the image. + pub level: Vec2<usize>, + + /// Position of the most left pixel of the row. + pub position: Vec2<usize>, + + /// The width of the line; the number of samples in this row, + /// that is, the number of f16, f32, or u32 values. + pub sample_count: usize, +} + + +impl LineIndex { + + /// Iterates the lines of this block index in interleaved fashion: + /// For each line in this block, this iterator steps once through each channel. + /// This is how lines are stored in a pixel data block. + /// + /// Does not check whether `self.layer_index`, `self.level`, `self.size` and `self.position` are valid indices.__ + // TODO be sure this cannot produce incorrect data, as this is not further checked but only handled with panics + #[inline] + #[must_use] + pub fn lines_in_block(block: BlockIndex, channels: &ChannelList) -> impl Iterator<Item=(Range<usize>, LineIndex)> { + struct LineIter { + layer: usize, level: Vec2<usize>, width: usize, + end_y: usize, x: usize, channel_sizes: SmallVec<[usize; 8]>, + byte: usize, channel: usize, y: usize, + } + + // FIXME what about sub sampling?? + + impl Iterator for LineIter { + type Item = (Range<usize>, LineIndex); + // TODO size hint? + + fn next(&mut self) -> Option<Self::Item> { + if self.y < self.end_y { + + // compute return value before incrementing + let byte_len = self.channel_sizes[self.channel]; + let return_value = ( + (self.byte .. self.byte + byte_len), + LineIndex { + channel: self.channel, + layer: self.layer, + level: self.level, + position: Vec2(self.x, self.y), + sample_count: self.width, + } + ); + + { // increment indices + self.byte += byte_len; + self.channel += 1; + + if self.channel == self.channel_sizes.len() { + self.channel = 0; + self.y += 1; + } + } + + Some(return_value) + } + + else { + None + } + } + } + + let channel_line_sizes: SmallVec<[usize; 8]> = channels.list.iter() + .map(move |channel| block.pixel_size.0 * channel.sample_type.bytes_per_sample()) // FIXME is it fewer samples per tile or just fewer tiles for sampled images??? + .collect(); + + LineIter { + layer: block.layer, + level: block.level, + width: block.pixel_size.0, + x: block.pixel_position.0, + end_y: block.pixel_position.y() + block.pixel_size.height(), + channel_sizes: channel_line_sizes, + + byte: 0, + channel: 0, + y: block.pixel_position.y() + } + } +} + + + +impl<'s> LineRefMut<'s> { + + /// Writes the samples (f16, f32, u32 values) into this line value reference. + /// Use `write_samples` if there is not slice available. + #[inline] + #[must_use] + pub fn write_samples_from_slice<T: crate::io::Data>(self, slice: &[T]) -> UnitResult { + debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); + debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + + T::write_slice(&mut Cursor::new(self.value), slice) + } + + /// Iterate over all samples in this line, from left to right. + /// The supplied `get_line` function returns the sample value + /// for a given sample index within the line, + /// which starts at zero for each individual line. + /// Use `write_samples_from_slice` if you already have a slice of samples. + #[inline] + #[must_use] + pub fn write_samples<T: crate::io::Data>(self, mut get_sample: impl FnMut(usize) -> T) -> UnitResult { + debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + + let mut write = Cursor::new(self.value); + + for index in 0..self.location.sample_count { + T::write(get_sample(index), &mut write)?; + } + + Ok(()) + } +} + +impl LineRef<'_> { + + /// Read the samples (f16, f32, u32 values) from this line value reference. + /// Use `read_samples` if there is not slice available. + pub fn read_samples_into_slice<T: crate::io::Data>(self, slice: &mut [T]) -> UnitResult { + debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); + debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + + T::read_slice(&mut Cursor::new(self.value), slice) + } + + /// Iterate over all samples in this line, from left to right. + /// Use `read_sample_into_slice` if you already have a slice of samples. + pub fn read_samples<T: crate::io::Data>(&self) -> impl Iterator<Item = Result<T>> + '_ { + debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + + let mut read = self.value.clone(); // FIXME deep data + (0..self.location.sample_count).map(move |_| T::read(&mut read)) + } +}
\ No newline at end of file diff --git a/vendor/exr/src/block/mod.rs b/vendor/exr/src/block/mod.rs new file mode 100644 index 0000000..1d20aa8 --- /dev/null +++ b/vendor/exr/src/block/mod.rs @@ -0,0 +1,257 @@ +//! This is the low-level interface for the raw blocks of an image. +//! See `exr::image` module for a high-level interface. +//! +//! Handle compressed and uncompressed pixel byte blocks. Includes compression and decompression, +//! and reading a complete image into blocks. +//! +//! Start with the `block::read(...)` +//! and `block::write(...)` functions. + + +pub mod writer; +pub mod reader; + +pub mod lines; +pub mod samples; +pub mod chunk; + + +use std::io::{Read, Seek, Write}; +use crate::error::{Result, UnitResult, Error, usize_to_i32}; +use crate::meta::{Headers, MetaData, BlockDescription}; +use crate::math::Vec2; +use crate::compression::ByteVec; +use crate::block::chunk::{CompressedBlock, CompressedTileBlock, CompressedScanLineBlock, Chunk, TileCoordinates}; +use crate::meta::header::Header; +use crate::block::lines::{LineIndex, LineRef, LineSlice, LineRefMut}; +use crate::meta::attribute::ChannelList; + + +/// Specifies where a block of pixel data should be placed in the actual image. +/// This is a globally unique identifier which +/// includes the layer, level index, and pixel location. +#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)] +pub struct BlockIndex { + + /// Index of the layer. + pub layer: usize, + + /// Index of the top left pixel from the block within the data window. + pub pixel_position: Vec2<usize>, + + /// Number of pixels in this block, extending to the right and downwards. + /// Stays the same across all resolution levels. + pub pixel_size: Vec2<usize>, + + /// Index of the mip or rip level in the image. + pub level: Vec2<usize>, +} + +/// Contains a block of pixel data and where that data should be placed in the actual image. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct UncompressedBlock { + + /// Location of the data inside the image. + pub index: BlockIndex, + + /// Uncompressed pixel values of the whole block. + /// One or more scan lines may be stored together as a scan line block. + /// This byte vector contains all pixel rows, one after another. + /// For each line in the tile, for each channel, the row values are contiguous. + /// Stores all samples of the first channel, then all samples of the second channel, and so on. + pub data: ByteVec, +} + +/// Immediately reads the meta data from the file. +/// Then, returns a reader that can be used to read all pixel blocks. +/// From the reader, you can pull each compressed chunk from the file. +/// Alternatively, you can create a decompressor, and pull the uncompressed data from it. +/// The reader is assumed to be buffered. +pub fn read<R: Read + Seek>(buffered_read: R, pedantic: bool) -> Result<self::reader::Reader<R>> { + self::reader::Reader::read_from_buffered(buffered_read, pedantic) +} + +/// Immediately writes the meta data to the file. +/// Then, calls a closure with a writer that can be used to write all pixel blocks. +/// In the closure, you can push compressed chunks directly into the writer. +/// Alternatively, you can create a compressor, wrapping the writer, and push the uncompressed data to it. +/// The writer is assumed to be buffered. +pub fn write<W: Write + Seek>( + buffered_write: W, headers: Headers, compatibility_checks: bool, + write_chunks: impl FnOnce(MetaData, &mut self::writer::ChunkWriter<W>) -> UnitResult +) -> UnitResult { + self::writer::write_chunks_with(buffered_write, headers, compatibility_checks, write_chunks) +} + + + + +/// This iterator tells you the block indices of all blocks that must be in the image. +/// The order of the blocks depends on the `LineOrder` attribute +/// (unspecified line order is treated the same as increasing line order). +/// The blocks written to the file must be exactly in this order, +/// except for when the `LineOrder` is unspecified. +/// The index represents the block index, in increasing line order, within the header. +pub fn enumerate_ordered_header_block_indices(headers: &[Header]) -> impl '_ + Iterator<Item=(usize, BlockIndex)> { + headers.iter().enumerate().flat_map(|(layer_index, header)|{ + header.enumerate_ordered_blocks().map(move |(index_in_header, tile)|{ + let data_indices = header.get_absolute_block_pixel_coordinates(tile.location).expect("tile coordinate bug"); + + let block = BlockIndex { + layer: layer_index, + level: tile.location.level_index, + pixel_position: data_indices.position.to_usize("data indices start").expect("data index bug"), + pixel_size: data_indices.size, + }; + + (index_in_header, block) + }) + }) +} + + +impl UncompressedBlock { + + /// Decompress the possibly compressed chunk and returns an `UncompressedBlock`. + // for uncompressed data, the ByteVec in the chunk is moved all the way + #[inline] + #[must_use] + pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData, pedantic: bool) -> Result<Self> { + let header: &Header = meta_data.headers.get(chunk.layer_index) + .ok_or(Error::invalid("chunk layer index"))?; + + let tile_data_indices = header.get_block_data_indices(&chunk.compressed_block)?; + let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_data_indices)?; + + absolute_indices.validate(Some(header.layer_size))?; + + match chunk.compressed_block { + CompressedBlock::Tile(CompressedTileBlock { compressed_pixels, .. }) | + CompressedBlock::ScanLine(CompressedScanLineBlock { compressed_pixels, .. }) => { + Ok(UncompressedBlock { + data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices, pedantic)?, + index: BlockIndex { + layer: chunk.layer_index, + pixel_position: absolute_indices.position.to_usize("data indices start")?, + level: tile_data_indices.level_index, + pixel_size: absolute_indices.size, + } + }) + }, + + _ => return Err(Error::unsupported("deep data not supported yet")) + } + } + + /// Consume this block by compressing it, returning a `Chunk`. + // for uncompressed data, the ByteVec in the chunk is moved all the way + #[inline] + #[must_use] + pub fn compress_to_chunk(self, headers: &[Header]) -> Result<Chunk> { + let UncompressedBlock { data, index } = self; + + let header: &Header = headers.get(index.layer) + .expect("block layer index bug"); + + let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area(); // TODO sampling?? + if expected_byte_size != data.len() { + panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len()); + } + + let tile_coordinates = TileCoordinates { + // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) + tile_index: index.pixel_position / header.max_block_pixel_size(), // TODO sampling?? + level_index: index.level, + }; + + let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_coordinates)?; + absolute_indices.validate(Some(header.layer_size))?; + + if !header.compression.may_loose_data() { debug_assert_eq!( + &header.compression.decompress_image_section( + header, + header.compression.compress_image_section(header, data.clone(), absolute_indices)?, + absolute_indices, + true + ).unwrap(), + &data, + "compression method not round trippin'" + ); } + + let compressed_data = header.compression.compress_image_section(header, data, absolute_indices)?; + + Ok(Chunk { + layer_index: index.layer, + compressed_block : match header.blocks { + BlockDescription::ScanLines => CompressedBlock::ScanLine(CompressedScanLineBlock { + compressed_pixels: compressed_data, + + // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) + y_coordinate: usize_to_i32(index.pixel_position.y()) + header.own_attributes.layer_position.y(), // TODO sampling?? + }), + + BlockDescription::Tiles(_) => CompressedBlock::Tile(CompressedTileBlock { + compressed_pixels: compressed_data, + coordinates: tile_coordinates, + }), + } + }) + } + + /// Iterate all the lines in this block. + /// Each line contains the all samples for one of the channels. + pub fn lines(&self, channels: &ChannelList) -> impl Iterator<Item=LineRef<'_>> { + LineIndex::lines_in_block(self.index, channels) + .map(move |(bytes, line)| LineSlice { location: line, value: &self.data[bytes] }) + } + + /* TODO pub fn lines_mut<'s>(&'s mut self, header: &Header) -> impl 's + Iterator<Item=LineRefMut<'s>> { + LineIndex::lines_in_block(self.index, &header.channels) + .map(move |(bytes, line)| LineSlice { location: line, value: &mut self.data[bytes] }) + }*/ + + /*// TODO make iterator + /// Call a closure for each line of samples in this uncompressed block. + pub fn for_lines( + &self, header: &Header, + mut accept_line: impl FnMut(LineRef<'_>) -> UnitResult + ) -> UnitResult { + for (bytes, line) in LineIndex::lines_in_block(self.index, &header.channels) { + let line_ref = LineSlice { location: line, value: &self.data[bytes] }; + accept_line(line_ref)?; + } + + Ok(()) + }*/ + + // TODO from iterator?? + /// Create an uncompressed block byte vector by requesting one line of samples after another. + pub fn collect_block_data_from_lines( + channels: &ChannelList, block_index: BlockIndex, + mut extract_line: impl FnMut(LineRefMut<'_>) + ) -> Vec<u8> + { + let byte_count = block_index.pixel_size.area() * channels.bytes_per_pixel; + let mut block_bytes = vec![0_u8; byte_count]; + + for (byte_range, line_index) in LineIndex::lines_in_block(block_index, channels) { + extract_line(LineRefMut { // TODO subsampling + value: &mut block_bytes[byte_range], + location: line_index, + }); + } + + block_bytes + } + + /// Create an uncompressed block by requesting one line of samples after another. + pub fn from_lines( + channels: &ChannelList, block_index: BlockIndex, + extract_line: impl FnMut(LineRefMut<'_>) + ) -> Self { + Self { + index: block_index, + data: Self::collect_block_data_from_lines(channels, block_index, extract_line) + } + } +}
\ No newline at end of file diff --git a/vendor/exr/src/block/reader.rs b/vendor/exr/src/block/reader.rs new file mode 100644 index 0000000..bb9888e --- /dev/null +++ b/vendor/exr/src/block/reader.rs @@ -0,0 +1,527 @@ +//! Composable structures to handle reading an image. + + +use std::convert::TryFrom; +use std::fmt::Debug; +use std::io::{Read, Seek}; +use rayon_core::{ThreadPool, ThreadPoolBuildError}; + +use smallvec::alloc::sync::Arc; + +use crate::block::{BlockIndex, UncompressedBlock}; +use crate::block::chunk::{Chunk, TileCoordinates}; +use crate::compression::Compression; +use crate::error::{Error, Result, u64_to_usize, UnitResult}; +use crate::io::{PeekRead, Tracking}; +use crate::meta::{MetaData, OffsetTables}; +use crate::meta::header::Header; + +/// Decode the meta data from a byte source, keeping the source ready for further reading. +/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. +#[derive(Debug)] +pub struct Reader<R> { + meta_data: MetaData, + remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? +} + +impl<R: Read + Seek> Reader<R> { + + /// Start the reading process. + /// Immediately decodes the meta data into an internal field. + /// Access it via`meta_data()`. + pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { + let mut remaining_reader = PeekRead::new(Tracking::new(read)); + let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; + Ok(Self { meta_data, remaining_reader }) + } + + // must not be mutable, as reading the file later on relies on the meta data + /// The decoded exr meta data from the file. + pub fn meta_data(&self) -> &MetaData { &self.meta_data } + + /// The decoded exr meta data from the file. + pub fn headers(&self) -> &[Header] { &self.meta_data.headers } + + /// Obtain the meta data ownership. + pub fn into_meta_data(self) -> MetaData { self.meta_data } + + /// Prepare to read all the chunks from the file. + /// Does not decode the chunks now, but returns a decoder. + /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. + pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { + let total_chunk_count = { + if pedantic { + let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; + validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; + offset_tables.iter().map(|table| table.len()).sum() + } + else { + usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) + .expect("too large chunk count for this machine") + } + }; + + Ok(AllChunksReader { + meta_data: self.meta_data, + remaining_chunks: 0 .. total_chunk_count, + remaining_bytes: self.remaining_reader, + pedantic + }) + } + + /// Prepare to read some the chunks from the file. + /// Does not decode the chunks now, but returns a decoder. + /// Reading only some chunks may seeking the file, potentially skipping many bytes. + // TODO tile indices add no new information to block index?? + pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { + let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; + + // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? + if pedantic { + validate_offset_tables( + self.meta_data.headers.as_slice(), &offset_tables, + self.remaining_reader.byte_position() + )?; + } + + let mut filtered_offsets = Vec::with_capacity( + (self.meta_data.headers.len() * 32).min(2*2048) + ); + + // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied + + for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers + for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order + let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; + + let block = BlockIndex { + layer: header_index, + level: tile.location.level_index, + pixel_position: data_indices.position.to_usize("data indices start")?, + pixel_size: data_indices.size, + }; + + if filter(&self.meta_data, tile.location, block) { + filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` + } + }; + } + + filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) + + if pedantic { + // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. + if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { + return Err(Error::invalid("chunk offset table")) + } + } + + Ok(FilteredChunksReader { + meta_data: self.meta_data, + expected_filtered_chunk_count: filtered_offsets.len(), + remaining_filtered_chunk_indices: filtered_offsets.into_iter(), + remaining_bytes: self.remaining_reader + }) + } +} + + +fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { + let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max + .map(|header| header.max_pixel_file_bytes()) + .sum(); + + // check that each offset is within the bounds + let end_byte = chunks_start_byte + max_pixel_bytes; + let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) + .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); + + if is_invalid { Err(Error::invalid("offset table")) } + else { Ok(()) } +} + + + + +/// Decode the desired chunks and skip the unimportant chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct FilteredChunksReader<R> { + meta_data: MetaData, + expected_filtered_chunk_count: usize, + remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, + remaining_bytes: PeekRead<Tracking<R>>, +} + +/// Decode all chunks in the file without seeking. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct AllChunksReader<R> { + meta_data: MetaData, + remaining_chunks: std::ops::Range<usize>, + remaining_bytes: PeekRead<Tracking<R>>, + pedantic: bool, +} + +/// Decode chunks in the file without seeking. +/// Calls the supplied closure for each chunk. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct OnProgressChunksReader<R, F> { + chunks_reader: R, + decoded_chunks: usize, + callback: F, +} + +/// Decode chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { + + /// The decoded exr meta data from the file. + fn meta_data(&self) -> &MetaData; + + /// The decoded exr headers from the file. + fn headers(&self) -> &[Header] { &self.meta_data().headers } + + /// The number of chunks that this reader will return in total. + /// Can be less than the total number of chunks in the file, if some chunks are skipped. + fn expected_chunk_count(&self) -> usize; + + /// Read the next compressed chunk from the file. + /// Equivalent to `.next()`, as this also is an iterator. + /// Returns `None` if all chunks have been read. + fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } + + /// Create a new reader that calls the provided progress + /// callback for each chunk that is read from the file. + /// If the file can be successfully decoded, + /// the progress will always at least once include 0.0 at the start and 1.0 at the end. + fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { + OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } + } + + /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. + /// The order of the blocks is not deterministic. + /// You can also use `parallel_decompressor` to obtain an iterator instead. + /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. + // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) + fn decompress_parallel( + self, pedantic: bool, + mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult + ) -> UnitResult + { + let mut decompressor = match self.parallel_decompressor(pedantic) { + Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), + Ok(decompressor) => decompressor, + }; + + while let Some(block) = decompressor.next() { + insert_block(decompressor.meta_data(), block?)?; + } + + debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); + Ok(()) + } + + /// Return an iterator that decompresses the chunks with multiple threads. + /// The order of the blocks is not deterministic. + /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. + /// By default, this uses as many threads as there are CPUs. + /// Returns the `self` if there is no need for parallel decompression. + fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { + ParallelBlockDecompressor::new(self, pedantic) + } + + /// Return an iterator that decompresses the chunks in this thread. + /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. + fn decompress_sequential( + self, pedantic: bool, + mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult + ) -> UnitResult + { + let mut decompressor = self.sequential_decompressor(pedantic); + while let Some(block) = decompressor.next() { + insert_block(decompressor.meta_data(), block?)?; + } + + debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); + Ok(()) + } + + /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. + fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { + SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } + } +} + +impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { + fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } + fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } +} + +impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} +impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + self.chunks_reader.next().map(|item|{ + { + let total_chunks = self.expected_chunk_count() as f64; + let callback = &mut self.callback; + callback(self.decoded_chunks as f64 / total_chunks); + } + + self.decoded_chunks += 1; + item + }) + .or_else(||{ + debug_assert_eq!( + self.decoded_chunks, self.expected_chunk_count(), + "chunks reader finished but not all chunks are decompressed" + ); + + let callback = &mut self.callback; + callback(1.0); + None + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.chunks_reader.size_hint() + } +} + +impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { + fn meta_data(&self) -> &MetaData { &self.meta_data } + fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } +} + +impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} +impl<R: Read + Seek> Iterator for AllChunksReader<R> { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + // read as many chunks as the file should contain (inferred from meta data) + let next_chunk = self.remaining_chunks.next() + .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); + + // if no chunks are left, but some bytes remain, return error + if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { + return Some(Err(Error::invalid("end of file expected"))); + } + + next_chunk + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) + } +} + +impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { + fn meta_data(&self) -> &MetaData { &self.meta_data } + fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } +} + +impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} +impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { + type Item = Result<Chunk>; + + fn next(&mut self) -> Option<Self::Item> { + // read as many chunks as we have desired chunk offsets + self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ + self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts + usize::try_from(next_chunk_location) + .expect("too large chunk position for this machine") + )?; + + let meta_data = &self.meta_data; + Chunk::read(&mut self.remaining_bytes, meta_data) + }) + + // TODO remember last chunk index and then seek to index+size and check whether bytes are left? + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) + } +} + +/// Read all chunks from the file, decompressing each chunk immediately. +/// Implements iterator. +#[derive(Debug)] +pub struct SequentialBlockDecompressor<R: ChunksReader> { + remaining_chunks_reader: R, + pedantic: bool, +} + +impl<R: ChunksReader> SequentialBlockDecompressor<R> { + + /// The extracted meta data from the image file. + pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } + + /// Read and then decompress a single block of pixels from the byte source. + pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { + self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ + UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) + }) + } +} + +/// Decompress the chunks in a file in parallel. +/// The first call to `next` will fill the thread pool with jobs, +/// starting to decompress the next few blocks. +/// These jobs will finish, even if you stop reading more blocks. +/// Implements iterator. +#[derive(Debug)] +pub struct ParallelBlockDecompressor<R: ChunksReader> { + remaining_chunks: R, + sender: flume::Sender<Result<UncompressedBlock>>, + receiver: flume::Receiver<Result<UncompressedBlock>>, + currently_decompressing_count: usize, + max_threads: usize, + + shared_meta_data_ref: Arc<MetaData>, + pedantic: bool, + + pool: ThreadPool, +} + +impl<R: ChunksReader> ParallelBlockDecompressor<R> { + + /// Create a new decompressor. Does not immediately spawn any tasks. + /// Decompression starts after the first call to `next`. + /// Returns the chunks if parallel decompression should not be used. + /// Use `new_with_thread_pool` to customize the threadpool. + pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { + Self::new_with_thread_pool(chunks, pedantic, ||{ + rayon_core::ThreadPoolBuilder::new() + .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) + .build() + }) + } + + /// Create a new decompressor. Does not immediately spawn any tasks. + /// Decompression starts after the first call to `next`. + /// Returns the chunks if parallel decompression should not be used. + pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) + -> std::result::Result<Self, R> + where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> + { + // if no compression is used in the file, don't use a threadpool + if chunks.meta_data().headers.iter() + .all(|head|head.compression == Compression::Uncompressed) + { + return Err(chunks); + } + + // in case thread pool creation fails (for example on WASM currently), + // we revert to sequential decompression + let pool = match try_create_thread_pool() { + Ok(pool) => pool, + + // TODO print warning? + Err(_) => return Err(chunks), + }; + + let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times + + let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + + Ok(Self { + shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), + currently_decompressing_count: 0, + remaining_chunks: chunks, + sender: send, + receiver: recv, + pedantic, + max_threads, + + pool, + }) + } + + /// Fill the pool with decompression jobs. Returns the first job that finishes. + pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { + + while self.currently_decompressing_count < self.max_threads { + let block = self.remaining_chunks.next(); + if let Some(block) = block { + let block = match block { + Ok(block) => block, + Err(error) => return Some(Err(error)) + }; + + let sender = self.sender.clone(); + let meta = self.shared_meta_data_ref.clone(); + let pedantic = self.pedantic; + + self.currently_decompressing_count += 1; + + self.pool.spawn(move || { + let decompressed_or_err = UncompressedBlock::decompress_chunk( + block, &meta, pedantic + ); + + // by now, decompressing could have failed in another thread. + // the error is then already handled, so we simply + // don't send the decompressed block and do nothing + let _ = sender.send(decompressed_or_err); + }); + } + else { + // there are no chunks left to decompress + break; + } + } + + if self.currently_decompressing_count > 0 { + let next = self.receiver.recv() + .expect("all decompressing senders hung up but more messages were expected"); + + self.currently_decompressing_count -= 1; + Some(next) + } + else { + debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable + debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); + None + } + } + + /// The extracted meta data of the image file. + pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } +} + +impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { + type Item = Result<UncompressedBlock>; + fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } + fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } +} + +impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { + type Item = Result<UncompressedBlock>; + fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } + fn size_hint(&self) -> (usize, Option<usize>) { + let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; + (remaining, Some(remaining)) + } +} + + + + + diff --git a/vendor/exr/src/block/samples.rs b/vendor/exr/src/block/samples.rs new file mode 100644 index 0000000..4352b11 --- /dev/null +++ b/vendor/exr/src/block/samples.rs @@ -0,0 +1,248 @@ +//! Extract pixel samples from a block of pixel bytes. + +use crate::prelude::*; +use half::prelude::HalfFloatSliceExt; + + +/// A single red, green, blue, or alpha value. +#[derive(Copy, Clone, Debug)] +pub enum Sample { + + /// A 16-bit float sample. + F16(f16), + + /// A 32-bit float sample. + F32(f32), + + /// An unsigned integer sample. + U32(u32) +} + +impl Sample { + + /// Create a sample containing a 32-bit float. + pub fn f32(f32: f32) -> Self { Sample::F32(f32) } + + /// Create a sample containing a 16-bit float. + pub fn f16(f16: f16) -> Self { Sample::F16(f16) } + + /// Create a sample containing a 32-bit integer. + pub fn u32(u32: u32) -> Self { Sample::U32(u32) } + + /// Convert the sample to an f16 value. This has lower precision than f32. + /// Note: An f32 can only represent integers up to `1024` as precise as a u32 could. + #[inline] + pub fn to_f16(self) -> f16 { + match self { + Sample::F16(sample) => sample, + Sample::F32(sample) => f16::from_f32(sample), + Sample::U32(sample) => f16::from_f32(sample as f32), + } + } + + /// Convert the sample to an f32 value. + /// Note: An f32 can only represent integers up to `8388608` as precise as a u32 could. + #[inline] + pub fn to_f32(self) -> f32 { + match self { + Sample::F32(sample) => sample, + Sample::F16(sample) => sample.to_f32(), + Sample::U32(sample) => sample as f32, + } + } + + /// Convert the sample to a u32. Rounds floats to integers the same way that `3.1 as u32` does. + #[inline] + pub fn to_u32(self) -> u32 { + match self { + Sample::F16(sample) => sample.to_f32() as u32, + Sample::F32(sample) => sample as u32, + Sample::U32(sample) => sample, + } + } + + /// Is this value not a number? + #[inline] + pub fn is_nan(self) -> bool { + match self { + Sample::F16(value) => value.is_nan(), + Sample::F32(value) => value.is_nan(), + Sample::U32(_) => false, + } + } + + /// Is this value zero or negative zero? + #[inline] + pub fn is_zero(&self) -> bool { + match *self { + Sample::F16(value) => value == f16::ZERO || value == f16::NEG_ZERO, + Sample::F32(value) => value == 0.0, + Sample::U32(value) => value == 0, + } + } +} + +impl PartialEq for Sample { + fn eq(&self, other: &Self) -> bool { + match *self { + Sample::F16(num) => num == other.to_f16(), + Sample::F32(num) => num == other.to_f32(), + Sample::U32(num) => num == other.to_u32(), + } + } +} + +// this is not recommended because it may hide whether a color is transparent or opaque and might be undesired for depth channels +impl Default for Sample { + fn default() -> Self { Sample::F32(0.0) } +} + +impl From<f16> for Sample { #[inline] fn from(f: f16) -> Self { Sample::F16(f) } } +impl From<f32> for Sample { #[inline] fn from(f: f32) -> Self { Sample::F32(f) } } +impl From<u32> for Sample { #[inline] fn from(f: u32) -> Self { Sample::U32(f) } } + +impl<T> From<Option<T>> for Sample where T: Into<Sample> + Default { + #[inline] fn from(num: Option<T>) -> Self { num.unwrap_or_default().into() } +} + + +impl From<Sample> for f16 { #[inline] fn from(s: Sample) -> Self { s.to_f16() } } +impl From<Sample> for f32 { #[inline] fn from(s: Sample) -> Self { s.to_f32() } } +impl From<Sample> for u32 { #[inline] fn from(s: Sample) -> Self { s.to_u32() } } + + +/// Create an arbitrary sample type from one of the defined sample types. +/// Should be compiled to a no-op where the file contains the predicted sample type. +/// The slice functions should be optimized into a `memcpy` where there is no conversion needed. +pub trait FromNativeSample: Sized + Copy + Default + 'static { + + /// Create this sample from a f16, trying to represent the same numerical value + fn from_f16(value: f16) -> Self; + + /// Create this sample from a f32, trying to represent the same numerical value + fn from_f32(value: f32) -> Self; + + /// Create this sample from a u32, trying to represent the same numerical value + fn from_u32(value: u32) -> Self; + + /// Convert all values from the slice into this type. + /// This function exists to allow the compiler to perform a vectorization optimization. + /// Note that this default implementation will **not** be vectorized by the compiler automatically. + /// For maximum performance you will need to override this function and implement it via + /// an explicit batched conversion such as [`convert_to_f32_slice`](https://docs.rs/half/2.3.1/half/slice/trait.HalfFloatSliceExt.html#tymethod.convert_to_f32_slice) + #[inline] + fn from_f16s(from: &[f16], to: &mut [Self]) { + assert_eq!(from.len(), to.len(), "slices must have the same length"); + for (from, to) in from.iter().zip(to.iter_mut()) { + *to = Self::from_f16(*from); + } + } + + /// Convert all values from the slice into this type. + /// This function exists to allow the compiler to perform a vectorization optimization. + /// Note that this default implementation will be vectorized by the compiler automatically. + #[inline] + fn from_f32s(from: &[f32], to: &mut [Self]) { + assert_eq!(from.len(), to.len(), "slices must have the same length"); + for (from, to) in from.iter().zip(to.iter_mut()) { + *to = Self::from_f32(*from); + } + } + + /// Convert all values from the slice into this type. + /// This function exists to allow the compiler to perform a vectorization optimization. + /// Note that this default implementation will be vectorized by the compiler automatically, + /// provided that the CPU supports the necessary conversion instructions. + /// For example, x86_64 lacks the instructions to convert `u32` to floats, + /// so this will inevitably be slow on x86_64. + #[inline] + fn from_u32s(from: &[u32], to: &mut [Self]) { + assert_eq!(from.len(), to.len(), "slices must have the same length"); + for (from, to) in from.iter().zip(to.iter_mut()) { + *to = Self::from_u32(*from); + } + } +} + +// TODO haven't i implemented this exact behaviour already somewhere else in this library...?? +impl FromNativeSample for f32 { + #[inline] fn from_f16(value: f16) -> Self { value.to_f32() } + #[inline] fn from_f32(value: f32) -> Self { value } + #[inline] fn from_u32(value: u32) -> Self { value as f32 } + + // f16 is a custom type + // so the compiler can not automatically vectorize the conversion + // that's why we need to specialize this function + #[inline] + fn from_f16s(from: &[f16], to: &mut [Self]) { + from.convert_to_f32_slice(to); + } +} + +impl FromNativeSample for u32 { + #[inline] fn from_f16(value: f16) -> Self { value.to_f32() as u32 } + #[inline] fn from_f32(value: f32) -> Self { value as u32 } + #[inline] fn from_u32(value: u32) -> Self { value } +} + +impl FromNativeSample for f16 { + #[inline] fn from_f16(value: f16) -> Self { value } + #[inline] fn from_f32(value: f32) -> Self { f16::from_f32(value) } + #[inline] fn from_u32(value: u32) -> Self { f16::from_f32(value as f32) } + + // f16 is a custom type + // so the compiler can not automatically vectorize the conversion + // that's why we need to specialize this function + #[inline] + fn from_f32s(from: &[f32], to: &mut [Self]) { + to.convert_from_f32_slice(from) + } +} + +impl FromNativeSample for Sample { + #[inline] fn from_f16(value: f16) -> Self { Self::from(value) } + #[inline] fn from_f32(value: f32) -> Self { Self::from(value) } + #[inline] fn from_u32(value: u32) -> Self { Self::from(value) } +} + + +/// Convert any type into one of the supported sample types. +/// Should be compiled to a no-op where the file contains the predicted sample type +pub trait IntoNativeSample: Copy + Default + Sync + 'static { + + /// Convert this sample to an f16, trying to represent the same numerical value. + fn to_f16(&self) -> f16; + + /// Convert this sample to an f32, trying to represent the same numerical value. + fn to_f32(&self) -> f32; + + /// Convert this sample to an u16, trying to represent the same numerical value. + fn to_u32(&self) -> u32; +} + +impl IntoNativeSample for f16 { + fn to_f16(&self) -> f16 { f16::from_f16(*self) } + fn to_f32(&self) -> f32 { f32::from_f16(*self) } + fn to_u32(&self) -> u32 { u32::from_f16(*self) } +} + +impl IntoNativeSample for f32 { + fn to_f16(&self) -> f16 { f16::from_f32(*self) } + fn to_f32(&self) -> f32 { f32::from_f32(*self) } + fn to_u32(&self) -> u32 { u32::from_f32(*self) } +} + +impl IntoNativeSample for u32 { + fn to_f16(&self) -> f16 { f16::from_u32(*self) } + fn to_f32(&self) -> f32 { f32::from_u32(*self) } + fn to_u32(&self) -> u32 { u32::from_u32(*self) } +} + +impl IntoNativeSample for Sample { + fn to_f16(&self) -> f16 { Sample::to_f16(*self) } + fn to_f32(&self) -> f32 { Sample::to_f32(*self) } + fn to_u32(&self) -> u32 { Sample::to_u32(*self) } +} + + + diff --git a/vendor/exr/src/block/writer.rs b/vendor/exr/src/block/writer.rs new file mode 100644 index 0000000..1227c69 --- /dev/null +++ b/vendor/exr/src/block/writer.rs @@ -0,0 +1,468 @@ +//! Composable structures to handle writing an image. + + +use std::fmt::Debug; +use std::io::Seek; +use std::iter::Peekable; +use std::ops::Not; +use rayon_core::{ThreadPool, ThreadPoolBuildError}; + +use smallvec::alloc::collections::BTreeMap; + +use crate::block::UncompressedBlock; +use crate::block::chunk::{Chunk}; +use crate::compression::Compression; +use crate::error::{Error, Result, UnitResult, usize_to_u64}; +use crate::io::{Data, Tracking, Write}; +use crate::meta::{Headers, MetaData, OffsetTables}; +use crate::meta::attribute::LineOrder; + +/// Write an exr file by writing one chunk after another in a closure. +/// In the closure, you are provided a chunk writer, which should be used to write all the chunks. +/// Assumes the your write destination is buffered. +pub fn write_chunks_with<W: Write + Seek>( + buffered_write: W, headers: Headers, pedantic: bool, + write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult +) -> UnitResult { + // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed + let (meta, mut writer) = ChunkWriter::new_for_buffered(buffered_write, headers, pedantic)?; + write_chunks(meta, &mut writer)?; + writer.complete_meta_data() +} + +/// Can consume compressed pixel chunks, writing them a file. +/// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data, +/// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`. +/// Use `on_progress` to obtain a new writer +/// that triggers a callback for each block. +// #[must_use] +#[derive(Debug)] +#[must_use] +pub struct ChunkWriter<W> { + header_count: usize, + byte_writer: Tracking<W>, + chunk_indices_byte_location: std::ops::Range<usize>, + chunk_indices_increasing_y: OffsetTables, + chunk_count: usize, // TODO compose? +} + +/// A new writer that triggers a callback +/// for each block written to the inner writer. +#[derive(Debug)] +#[must_use] +pub struct OnProgressChunkWriter<'w, W, F> { + chunk_writer: &'w mut W, + written_chunks: usize, + on_progress: F, +} + +/// Write chunks to a byte destination. +/// Then write each chunk with `writer.write_chunk(chunk)`. +pub trait ChunksWriter: Sized { + + /// The total number of chunks that the complete file will contain. + fn total_chunks_count(&self) -> usize; + + /// Any more calls will result in an error and have no effect. + /// If writing results in an error, the file and the writer + /// may remain in an invalid state and should not be used further. + /// Errors when the chunk at this index was already written. + fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult; + + /// Obtain a new writer that calls the specified closure for each block that is written to this writer. + fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) { + OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress } + } + + /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer. + fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> { + SequentialBlocksCompressor::new(meta, self) + } + + /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer. + /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead). + fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> { + ParallelBlocksCompressor::new(meta, self) + } + + /// Compresses all blocks to the file. + /// The index of the block must be in increasing line order within the header. + /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. + fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { + let mut writer = self.sequential_blocks_compressor(meta); + + // TODO check block order if line order is not unspecified! + for (index_in_header_increasing_y, block) in blocks { + writer.compress_block(index_in_header_increasing_y, block)?; + } + + // TODO debug_assert_eq!(self.is_complete()); + Ok(()) + } + + /// Compresses all blocks to the file. + /// The index of the block must be in increasing line order within the header. + /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. + /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. + fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { + let mut parallel_writer = match self.parallel_blocks_compressor(meta) { + None => return self.compress_all_blocks_sequential(meta, blocks), + Some(writer) => writer, + }; + + // TODO check block order if line order is not unspecified! + for (index_in_header_increasing_y, block) in blocks { + parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?; + } + + // TODO debug_assert_eq!(self.is_complete()); + Ok(()) + } +} + + +impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek { + + /// The total number of chunks that the complete file will contain. + fn total_chunks_count(&self) -> usize { self.chunk_count } + + /// Any more calls will result in an error and have no effect. + /// If writing results in an error, the file and the writer + /// may remain in an invalid state and should not be used further. + /// Errors when the chunk at this index was already written. + fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { + let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index]; + + if index_in_header_increasing_y >= header_chunk_indices.len() { + return Err(Error::invalid("too large chunk index")); + } + + let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y]; + if *chunk_index_slot != 0 { + return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y))); + } + + *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position()); + chunk.write(&mut self.byte_writer, self.header_count)?; + Ok(()) + } +} + +impl<W> ChunkWriter<W> where W: Write + Seek { + // -- the following functions are private, because they must be called in a strict order -- + + /// Writes the meta data and zeroed offset tables as a placeholder. + fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> { + let mut write = Tracking::new(buffered_byte_writer); + let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?; + + // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now + /*// if non-parallel compression, we always use increasing order anyways + if !parallel || !has_compression { + for header in &mut headers { + if header.line_order == LineOrder::Unspecified { + header.line_order = LineOrder::Increasing; + } + } + }*/ + + let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum(); + + let offset_table_start_byte = write.byte_position(); + let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE; + + // skip offset tables, filling with 0, will be updated after the last chunk has been written + write.seek_write_to(offset_table_end_byte)?; + + let header_count = headers.len(); + let chunk_indices_increasing_y = headers.iter() + .map(|header| vec![0_u64; header.chunk_count]).collect(); + + let meta_data = MetaData { requirements, headers }; + + Ok((meta_data, ChunkWriter { + header_count, + byte_writer: write, + chunk_count: offset_table_size, + chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte, + chunk_indices_increasing_y, + })) + } + + /// Seek back to the meta data, write offset tables, and flush the byte writer. + /// Leaves the writer seeked to the middle of the file. + fn complete_meta_data(mut self) -> UnitResult { + if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) { + return Err(Error::invalid("some chunks are not written yet")) + } + + // write all offset tables + debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated"); + self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?; + + for table in self.chunk_indices_increasing_y { + u64::write_slice(&mut self.byte_writer, table.as_slice())?; + } + + self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning + Ok(()) + } + +} + + +impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) { + fn total_chunks_count(&self) -> usize { + self.chunk_writer.total_chunks_count() + } + + fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { + let total_chunks = self.total_chunks_count(); + let on_progress = &mut self.on_progress; + + // guarantee on_progress being called with 0 once + if self.written_chunks == 0 { on_progress(0.0); } + + self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?; + + self.written_chunks += 1; + + on_progress({ + // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0 + if self.written_chunks == total_chunks { 1.0 } + else { self.written_chunks as f64 / total_chunks as f64 } + }); + + Ok(()) + } +} + + +/// Write blocks that appear in any order and reorder them before writing. +#[derive(Debug)] +#[must_use] +pub struct SortedBlocksWriter<'w, W> { + chunk_writer: &'w mut W, + pending_chunks: BTreeMap<usize, (usize, Chunk)>, + unwritten_chunk_indices: Peekable<std::ops::Range<usize>>, + requires_sorting: bool, // using this instead of Option, because of borrowing +} + + +impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter { + + /// New sorting writer. Returns `None` if sorting is not required. + pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> { + let requires_sorting = meta_data.headers.iter() + .any(|header| header.line_order != LineOrder::Unspecified); + + let total_chunk_count = chunk_writer.total_chunks_count(); + + SortedBlocksWriter { + pending_chunks: BTreeMap::new(), + unwritten_chunk_indices: (0 .. total_chunk_count).peekable(), + requires_sorting, + chunk_writer + } + } + + /// Write the chunk or stash it. In the closure, write all chunks that can be written now. + pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult { + if self.requires_sorting.not() { + return self.chunk_writer.write_chunk(chunk_y_index, chunk); + } + + // write this chunk now if possible + if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){ + self.chunk_writer.write_chunk(chunk_y_index, chunk)?; + self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); + + // write all pending blocks that are immediate successors of this block + while let Some((next_chunk_y_index, next_chunk)) = self + .unwritten_chunk_indices.peek().cloned() + .and_then(|id| self.pending_chunks.remove(&id)) + { + self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?; + self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); + } + } + + else { + // the argument block is not to be written now, + // and all the pending blocks are not next up either, + // so just stash this block + self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk)); + } + + Ok(()) + } + + /// Where the chunks will be written to. + pub fn inner_chunks_writer(&self) -> &W { + &self.chunk_writer + } +} + + + +/// Compress blocks to a chunk writer in this thread. +#[derive(Debug)] +#[must_use] +pub struct SequentialBlocksCompressor<'w, W> { + meta: &'w MetaData, + chunks_writer: &'w mut W, +} + +impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter { + + /// New blocks writer. + pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } } + + /// This is where the compressed blocks are written to. + pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer } + + /// Compress a single block immediately. The index of the block must be in increasing line order. + pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { + self.chunks_writer.write_chunk( + index_in_header_increasing_y, + block.compress_to_chunk(&self.meta.headers)? + ) + } +} + +/// Compress blocks to a chunk writer with multiple threads. +#[derive(Debug)] +#[must_use] +pub struct ParallelBlocksCompressor<'w, W> { + meta: &'w MetaData, + sorted_writer: SortedBlocksWriter<'w, W>, + + sender: flume::Sender<Result<(usize, usize, Chunk)>>, + receiver: flume::Receiver<Result<(usize, usize, Chunk)>>, + pool: rayon_core::ThreadPool, + + currently_compressing_count: usize, + written_chunk_count: usize, // used to check for last chunk + max_threads: usize, + next_incoming_chunk_index: usize, // used to remember original chunk order +} + +impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter { + + /// New blocks writer. Returns none if sequential compression should be used. + /// Use `new_with_thread_pool` to customize the threadpool. + pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> { + Self::new_with_thread_pool(meta, chunks_writer, ||{ + rayon_core::ThreadPoolBuilder::new() + .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index)) + .build() + }) + } + + /// New blocks writer. Returns none if sequential compression should be used. + pub fn new_with_thread_pool<CreatePool>( + meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool) + -> Option<Self> + where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> + { + if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) { + return None; + } + + // in case thread pool creation fails (for example on WASM currently), + // we revert to sequential compression + let pool = match try_create_thread_pool() { + Ok(pool) => pool, + + // TODO print warning? + Err(_) => return None, + }; + + let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times + let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + + Some(Self { + sorted_writer: SortedBlocksWriter::new(meta, chunks_writer), + next_incoming_chunk_index: 0, + currently_compressing_count: 0, + written_chunk_count: 0, + sender: send, + receiver: recv, + max_threads, + pool, + meta, + }) + } + + /// This is where the compressed blocks are written to. + pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() } + + // private, as may underflow counter in release mode + fn write_next_queued_chunk(&mut self) -> UnitResult { + debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left"); + + let some_compressed_chunk = self.receiver.recv() + .expect("cannot receive compressed block"); + + self.currently_compressing_count -= 1; + let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?; + self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?; + + self.written_chunk_count += 1; + Ok(()) + } + + /// Wait until all currently compressing chunks in the compressor have been written. + pub fn write_all_queued_chunks(&mut self) -> UnitResult { + while self.currently_compressing_count > 0 { + self.write_next_queued_chunk()?; + } + + debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count"); + Ok(()) + } + + /// Add a single block to the compressor queue. The index of the block must be in increasing line order. + /// When calling this function for the last block, this method waits until all the blocks have been written. + /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`. + /// Waits for a block from the queue to be written, if the queue already has enough items. + pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { + + // if pipe is full, block to wait for a slot to free up + if self.currently_compressing_count >= self.max_threads { + self.write_next_queued_chunk()?; + } + + // add the argument chunk to the compression queueue + let index_in_file = self.next_incoming_chunk_index; + let sender = self.sender.clone(); + let meta = self.meta.clone(); + + self.pool.spawn(move ||{ + let compressed_or_err = block.compress_to_chunk(&meta.headers); + + // by now, decompressing could have failed in another thread. + // the error is then already handled, so we simply + // don't send the decompressed block and do nothing + let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed))); + }); + + self.currently_compressing_count += 1; + self.next_incoming_chunk_index += 1; + + // if this is the last chunk, wait for all chunks to complete before returning + if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() { + self.write_all_queued_chunks()?; + debug_assert_eq!( + self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(), + "written chunk count mismatch" + ); + } + + + Ok(()) + } +} + + + |