diff options
| author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 | 
|---|---|---|
| committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 | 
| commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
| tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/exr/src/block | |
| parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
| download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip | |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/exr/src/block')
| -rw-r--r-- | vendor/exr/src/block/chunk.rs | 379 | ||||
| -rw-r--r-- | vendor/exr/src/block/lines.rs | 197 | ||||
| -rw-r--r-- | vendor/exr/src/block/mod.rs | 257 | ||||
| -rw-r--r-- | vendor/exr/src/block/reader.rs | 527 | ||||
| -rw-r--r-- | vendor/exr/src/block/samples.rs | 248 | ||||
| -rw-r--r-- | vendor/exr/src/block/writer.rs | 468 | 
6 files changed, 2076 insertions, 0 deletions
| diff --git a/vendor/exr/src/block/chunk.rs b/vendor/exr/src/block/chunk.rs new file mode 100644 index 0000000..ff138f8 --- /dev/null +++ b/vendor/exr/src/block/chunk.rs @@ -0,0 +1,379 @@ + +//! Read and write already compressed pixel data blocks. +//! Does not include the process of compression and decompression. + +use crate::meta::attribute::{IntegerBounds}; + +/// A generic block of pixel information. +/// Contains pixel data and an index to the corresponding header. +/// All pixel data in a file is split into a list of chunks. +/// Also contains positioning information that locates this +/// data block in the referenced layer. +#[derive(Debug, Clone)] +pub struct Chunk { + +    /// The index of the layer that the block belongs to. +    /// This is required as the pixel data can appear in any order in a file. +    // PDF says u64, but source code seems to be i32 +    pub layer_index: usize, + +    /// The compressed pixel contents. +    pub compressed_block: CompressedBlock, +} + +/// The raw, possibly compressed pixel data of a file. +/// Each layer in a file can have a different type. +/// Also contains positioning information that locates this +/// data block in the corresponding layer. +/// Exists inside a `Chunk`. +#[derive(Debug, Clone)] +pub enum CompressedBlock { + +    /// Scan line blocks of flat data. +    ScanLine(CompressedScanLineBlock), + +    /// Tiles of flat data. +    Tile(CompressedTileBlock), + +    /// Scan line blocks of deep data. +    DeepScanLine(CompressedDeepScanLineBlock), + +    /// Tiles of deep data. +    DeepTile(CompressedDeepTileBlock), +} + +/// A `Block` of possibly compressed flat scan lines. +/// Corresponds to type attribute `scanlineimage`. +#[derive(Debug, Clone)] +pub struct CompressedScanLineBlock { + +    /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. +    /// The top scan line block in the image is aligned with the top edge of the data window. +    pub y_coordinate: i32, + +    /// One or more scan lines may be stored together as a scan line block. +    /// The number of scan lines per block depends on how the pixel data are compressed. +    /// For each line in the tile, for each channel, the row values are contiguous. +    pub compressed_pixels: Vec<u8>, +} + +/// This `Block` is a tile of flat (non-deep) data. +/// Corresponds to type attribute `tiledimage`. +#[derive(Debug, Clone)] +pub struct CompressedTileBlock { + +    /// The tile location. +    pub coordinates: TileCoordinates, + +    /// One or more scan lines may be stored together as a scan line block. +    /// The number of scan lines per block depends on how the pixel data are compressed. +    /// For each line in the tile, for each channel, the row values are contiguous. +    pub compressed_pixels: Vec<u8>, +} + +/// Indicates the position and resolution level of a `TileBlock` or `DeepTileBlock`. +#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] +pub struct TileCoordinates { + +    /// Index of the tile, not pixel position. +    pub tile_index: Vec2<usize>, + +    /// Index of the Mip/Rip level. +    pub level_index: Vec2<usize>, +} + +/// This `Block` consists of one or more deep scan lines. +/// Corresponds to type attribute `deepscanline`. +#[derive(Debug, Clone)] +pub struct CompressedDeepScanLineBlock { + +    /// The block's y coordinate is the pixel space y coordinate of the top scan line in the block. +    /// The top scan line block in the image is aligned with the top edge of the data window. +    pub y_coordinate: i32, + +    /// Count of samples. +    pub decompressed_sample_data_size: usize, + +    /// The pixel offset table is a list of integers, one for each pixel column within the data window. +    /// Each entry in the table indicates the total number of samples required +    /// to store the pixel in it as well as all pixels to the left of it. +    pub compressed_pixel_offset_table: Vec<i8>, + +    /// One or more scan lines may be stored together as a scan line block. +    /// The number of scan lines per block depends on how the pixel data are compressed. +    /// For each line in the tile, for each channel, the row values are contiguous. +    pub compressed_sample_data: Vec<u8>, +} + +/// This `Block` is a tile of deep data. +/// Corresponds to type attribute `deeptile`. +#[derive(Debug, Clone)] +pub struct CompressedDeepTileBlock { + +    /// The tile location. +    pub coordinates: TileCoordinates, + +    /// Count of samples. +    pub decompressed_sample_data_size: usize, + +    /// The pixel offset table is a list of integers, one for each pixel column within the data window. +    /// Each entry in the table indicates the total number of samples required +    /// to store the pixel in it as well as all pixels to the left of it. +    pub compressed_pixel_offset_table: Vec<i8>, + +    /// One or more scan lines may be stored together as a scan line block. +    /// The number of scan lines per block depends on how the pixel data are compressed. +    /// For each line in the tile, for each channel, the row values are contiguous. +    pub compressed_sample_data: Vec<u8>, +} + + +use crate::io::*; + +impl TileCoordinates { + +    /// Without validation, write this instance to the byte stream. +    pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { +        i32::write(usize_to_i32(self.tile_index.x()), write)?; +        i32::write(usize_to_i32(self.tile_index.y()), write)?; +        i32::write(usize_to_i32(self.level_index.x()), write)?; +        i32::write(usize_to_i32(self.level_index.y()), write)?; +        Ok(()) +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read) -> Result<Self> { +        let tile_x = i32::read(read)?; +        let tile_y = i32::read(read)?; + +        let level_x = i32::read(read)?; +        let level_y = i32::read(read)?; + +        if level_x > 31 || level_y > 31 { +            // there can be at most 31 levels, because the largest level would have a size of 2^31, +            // which exceeds the maximum 32-bit integer value. +            return Err(Error::invalid("level index exceeding integer maximum")); +        } + +        Ok(TileCoordinates { +            tile_index: Vec2(tile_x, tile_y).to_usize("tile coordinate index")?, +            level_index: Vec2(level_x, level_y).to_usize("tile coordinate level")? +        }) +    } + +    /// The indices which can be used to index into the arrays of a data window. +    /// These coordinates are only valid inside the corresponding one header. +    /// Will start at 0 and always be positive. +    pub fn to_data_indices(&self, tile_size: Vec2<usize>, max: Vec2<usize>) -> Result<IntegerBounds> { +        let x = self.tile_index.x() * tile_size.width(); +        let y = self.tile_index.y() * tile_size.height(); + +        if x >= max.x() || y >= max.y() { +            Err(Error::invalid("tile index")) +        } +        else { +            Ok(IntegerBounds { +                position: Vec2(usize_to_i32(x), usize_to_i32(y)), +                size: Vec2( +                    calculate_block_size(max.x(), tile_size.width(), x)?, +                    calculate_block_size(max.y(), tile_size.height(), y)?, +                ), +            }) +        } +    } + +    /// Absolute coordinates inside the global 2D space of a file, may be negative. +    pub fn to_absolute_indices(&self, tile_size: Vec2<usize>, data_window: IntegerBounds) -> Result<IntegerBounds> { +        let data = self.to_data_indices(tile_size, data_window.size)?; +        Ok(data.with_origin(data_window.position)) +    } + +    /// Returns if this is the original resolution or a smaller copy. +    pub fn is_largest_resolution_level(&self) -> bool { +        self.level_index == Vec2(0, 0) +    } +} + + + +use crate::meta::{MetaData, BlockDescription, calculate_block_size}; + +impl CompressedScanLineBlock { + +    /// Without validation, write this instance to the byte stream. +    pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { +        debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); + +        i32::write(self.y_coordinate, write)?; +        u8::write_i32_sized_slice(write, &self.compressed_pixels)?; +        Ok(()) +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { +        let y_coordinate = i32::read(read)?; +        let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "scan line block sample count")?; +        Ok(CompressedScanLineBlock { y_coordinate, compressed_pixels }) +    } +} + +impl CompressedTileBlock { + +    /// Without validation, write this instance to the byte stream. +    pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { +        debug_assert_ne!(self.compressed_pixels.len(), 0, "empty blocks should not be put in the file bug"); + +        self.coordinates.write(write)?; +        u8::write_i32_sized_slice(write, &self.compressed_pixels)?; +        Ok(()) +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { +        let coordinates = TileCoordinates::read(read)?; +        let compressed_pixels = u8::read_i32_sized_vec(read, max_block_byte_size, Some(max_block_byte_size), "tile block sample count")?; +        Ok(CompressedTileBlock { coordinates, compressed_pixels }) +    } +} + +impl CompressedDeepScanLineBlock { + +    /// Without validation, write this instance to the byte stream. +    pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { +        debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); + +        i32::write(self.y_coordinate, write)?; +        u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; +        u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed +        u64::write(self.decompressed_sample_data_size as u64, write)?; +        i8::write_slice(write, &self.compressed_pixel_offset_table)?; +        u8::write_slice(write, &self.compressed_sample_data)?; +        Ok(()) +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read, max_block_byte_size: usize) -> Result<Self> { +        let y_coordinate = i32::read(read)?; +        let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); +        let compressed_sample_data_size = u64_to_usize(u64::read(read)?); +        let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); + +        // doc said i32, try u8 +        let compressed_pixel_offset_table = i8::read_vec( +            read, compressed_pixel_offset_table_size, +            6 * u16::MAX as usize, Some(max_block_byte_size), +            "deep scan line block table size" +        )?; + +        let compressed_sample_data = u8::read_vec( +            read, compressed_sample_data_size, +            6 * u16::MAX as usize, Some(max_block_byte_size), +            "deep scan line block sample count" +        )?; + +        Ok(CompressedDeepScanLineBlock { +            y_coordinate, +            decompressed_sample_data_size, +            compressed_pixel_offset_table, +            compressed_sample_data, +        }) +    } +} + + +impl CompressedDeepTileBlock { + +    /// Without validation, write this instance to the byte stream. +    pub fn write<W: Write>(&self, write: &mut W) -> UnitResult { +        debug_assert_ne!(self.compressed_sample_data.len(), 0, "empty blocks should not be put in the file bug"); + +        self.coordinates.write(write)?; +        u64::write(self.compressed_pixel_offset_table.len() as u64, write)?; +        u64::write(self.compressed_sample_data.len() as u64, write)?; // TODO just guessed +        u64::write(self.decompressed_sample_data_size as u64, write)?; +        i8::write_slice(write, &self.compressed_pixel_offset_table)?; +        u8::write_slice(write, &self.compressed_sample_data)?; +        Ok(()) +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read, hard_max_block_byte_size: usize) -> Result<Self> { +        let coordinates = TileCoordinates::read(read)?; +        let compressed_pixel_offset_table_size = u64_to_usize(u64::read(read)?); +        let compressed_sample_data_size = u64_to_usize(u64::read(read)?); // TODO u64 just guessed +        let decompressed_sample_data_size = u64_to_usize(u64::read(read)?); + +        let compressed_pixel_offset_table = i8::read_vec( +            read, compressed_pixel_offset_table_size, +            6 * u16::MAX as usize, Some(hard_max_block_byte_size), +            "deep tile block table size" +        )?; + +        let compressed_sample_data = u8::read_vec( +            read, compressed_sample_data_size, +            6 * u16::MAX as usize, Some(hard_max_block_byte_size), +            "deep tile block sample count" +        )?; + +        Ok(CompressedDeepTileBlock { +            coordinates, +            decompressed_sample_data_size, +            compressed_pixel_offset_table, +            compressed_sample_data, +        }) +    } +} + +use crate::error::{UnitResult, Result, Error, u64_to_usize, usize_to_i32, i32_to_usize}; +use crate::math::Vec2; + +/// Validation of chunks is done while reading and writing the actual data. (For example in exr::full_image) +impl Chunk { + +    /// Without validation, write this instance to the byte stream. +    pub fn write(&self, write: &mut impl Write, header_count: usize) -> UnitResult { +        debug_assert!(self.layer_index < header_count, "layer index bug"); // validation is done in full_image or simple_image + +        if header_count != 1 {  usize_to_i32(self.layer_index).write(write)?; } +        else { assert_eq!(self.layer_index, 0, "invalid header index for single layer file"); } + +        match self.compressed_block { +            CompressedBlock::ScanLine     (ref value) => value.write(write), +            CompressedBlock::Tile         (ref value) => value.write(write), +            CompressedBlock::DeepScanLine (ref value) => value.write(write), +            CompressedBlock::DeepTile     (ref value) => value.write(write), +        } +    } + +    /// Read the value without validating. +    pub fn read(read: &mut impl Read, meta_data: &MetaData) -> Result<Self> { +        let layer_number = i32_to_usize( +            if meta_data.requirements.is_multilayer() { i32::read(read)? } // documentation says u64, but is i32 +            else { 0_i32 }, // reference the first header for single-layer images +            "chunk data part number" +        )?; + +        if layer_number >= meta_data.headers.len() { +            return Err(Error::invalid("chunk data part number")); +        } + +        let header = &meta_data.headers[layer_number]; +        let max_block_byte_size = header.max_block_byte_size(); + +        let chunk = Chunk { +            layer_index: layer_number, +            compressed_block: match header.blocks { +                // flat data +                BlockDescription::ScanLines if !header.deep => CompressedBlock::ScanLine(CompressedScanLineBlock::read(read, max_block_byte_size)?), +                BlockDescription::Tiles(_) if !header.deep     => CompressedBlock::Tile(CompressedTileBlock::read(read, max_block_byte_size)?), + +                // deep data +                BlockDescription::ScanLines   => CompressedBlock::DeepScanLine(CompressedDeepScanLineBlock::read(read, max_block_byte_size)?), +                BlockDescription::Tiles(_)    => CompressedBlock::DeepTile(CompressedDeepTileBlock::read(read, max_block_byte_size)?), +            }, +        }; + +        Ok(chunk) +    } +} + diff --git a/vendor/exr/src/block/lines.rs b/vendor/exr/src/block/lines.rs new file mode 100644 index 0000000..1cdf8ee --- /dev/null +++ b/vendor/exr/src/block/lines.rs @@ -0,0 +1,197 @@ +//! Extract lines from a block of pixel bytes. + +use crate::math::*; +use std::io::{Cursor}; +use crate::error::{Result, UnitResult}; +use smallvec::SmallVec; +use std::ops::Range; +use crate::block::{BlockIndex}; +use crate::meta::attribute::ChannelList; + + +/// A single line of pixels. +/// Use [LineRef] or [LineRefMut] for easier type names. +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub struct LineSlice<T> { + +    // TODO also store enum SampleType, as it would always be matched in every place it is used + +    /// Where this line is located inside the image. +    pub location: LineIndex, + +    /// The raw bytes of the pixel line, either `&[u8]` or `&mut [u8]`. +    /// Must be re-interpreted as slice of f16, f32, or u32, +    /// according to the channel data type. +    pub value: T, +} + + +/// An reference to a single line of pixels. +/// May go across the whole image or just a tile section of it. +/// +/// This line contains an immutable slice that all samples will be read from. +pub type LineRef<'s> = LineSlice<&'s [u8]>; + +/// A reference to a single mutable line of pixels. +/// May go across the whole image or just a tile section of it. +/// +/// This line contains a mutable slice that all samples will be written to. +pub type LineRefMut<'s> = LineSlice<&'s mut [u8]>; + + +/// Specifies where a row of pixels lies inside an image. +/// This is a globally unique identifier which includes +/// the layer, channel index, and pixel location. +#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)] +pub struct LineIndex { + +    /// Index of the layer. +    pub layer: usize, + +    /// The channel index of the layer. +    pub channel: usize, + +    /// Index of the mip or rip level in the image. +    pub level: Vec2<usize>, + +    /// Position of the most left pixel of the row. +    pub position: Vec2<usize>, + +    /// The width of the line; the number of samples in this row, +    /// that is, the number of f16, f32, or u32 values. +    pub sample_count: usize, +} + + +impl LineIndex { + +    /// Iterates the lines of this block index in interleaved fashion: +    /// For each line in this block, this iterator steps once through each channel. +    /// This is how lines are stored in a pixel data block. +    /// +    /// Does not check whether `self.layer_index`, `self.level`, `self.size` and `self.position` are valid indices.__ +    // TODO be sure this cannot produce incorrect data, as this is not further checked but only handled with panics +    #[inline] +    #[must_use] +    pub fn lines_in_block(block: BlockIndex, channels: &ChannelList) -> impl Iterator<Item=(Range<usize>, LineIndex)> { +        struct LineIter { +            layer: usize, level: Vec2<usize>, width: usize, +            end_y: usize, x: usize, channel_sizes: SmallVec<[usize; 8]>, +            byte: usize, channel: usize, y: usize, +        } + +        // FIXME what about sub sampling?? + +        impl Iterator for LineIter { +            type Item = (Range<usize>, LineIndex); +            // TODO size hint? + +            fn next(&mut self) -> Option<Self::Item> { +                if self.y < self.end_y { + +                    // compute return value before incrementing +                    let byte_len = self.channel_sizes[self.channel]; +                    let return_value = ( +                        (self.byte .. self.byte + byte_len), +                        LineIndex { +                            channel: self.channel, +                            layer: self.layer, +                            level: self.level, +                            position: Vec2(self.x, self.y), +                            sample_count: self.width, +                        } +                    ); + +                    { // increment indices +                        self.byte += byte_len; +                        self.channel += 1; + +                        if self.channel == self.channel_sizes.len() { +                            self.channel = 0; +                            self.y += 1; +                        } +                    } + +                    Some(return_value) +                } + +                else { +                    None +                } +            } +        } + +        let channel_line_sizes: SmallVec<[usize; 8]> = channels.list.iter() +            .map(move |channel| block.pixel_size.0 * channel.sample_type.bytes_per_sample()) // FIXME is it fewer samples per tile or just fewer tiles for sampled images??? +            .collect(); + +        LineIter { +            layer: block.layer, +            level: block.level, +            width: block.pixel_size.0, +            x: block.pixel_position.0, +            end_y: block.pixel_position.y() + block.pixel_size.height(), +            channel_sizes: channel_line_sizes, + +            byte: 0, +            channel: 0, +            y: block.pixel_position.y() +        } +    } +} + + + +impl<'s> LineRefMut<'s> { + +    /// Writes the samples (f16, f32, u32 values) into this line value reference. +    /// Use `write_samples` if there is not slice available. +    #[inline] +    #[must_use] +    pub fn write_samples_from_slice<T: crate::io::Data>(self, slice: &[T]) -> UnitResult { +        debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); +        debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + +        T::write_slice(&mut Cursor::new(self.value), slice) +    } + +    /// Iterate over all samples in this line, from left to right. +    /// The supplied `get_line` function returns the sample value +    /// for a given sample index within the line, +    /// which starts at zero for each individual line. +    /// Use `write_samples_from_slice` if you already have a slice of samples. +    #[inline] +    #[must_use] +    pub fn write_samples<T: crate::io::Data>(self, mut get_sample: impl FnMut(usize) -> T) -> UnitResult { +        debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + +        let mut write = Cursor::new(self.value); + +        for index in 0..self.location.sample_count { +            T::write(get_sample(index), &mut write)?; +        } + +        Ok(()) +    } +} + +impl LineRef<'_> { + +    /// Read the samples (f16, f32, u32 values) from this line value reference. +    /// Use `read_samples` if there is not slice available. +    pub fn read_samples_into_slice<T: crate::io::Data>(self, slice: &mut [T]) -> UnitResult { +        debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width"); +        debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + +        T::read_slice(&mut Cursor::new(self.value), slice) +    } + +    /// Iterate over all samples in this line, from left to right. +    /// Use `read_sample_into_slice` if you already have a slice of samples. +    pub fn read_samples<T: crate::io::Data>(&self) -> impl Iterator<Item = Result<T>> + '_ { +        debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size"); + +        let mut read = self.value.clone(); // FIXME deep data +        (0..self.location.sample_count).map(move |_| T::read(&mut read)) +    } +}
\ No newline at end of file diff --git a/vendor/exr/src/block/mod.rs b/vendor/exr/src/block/mod.rs new file mode 100644 index 0000000..1d20aa8 --- /dev/null +++ b/vendor/exr/src/block/mod.rs @@ -0,0 +1,257 @@ +//! This is the low-level interface for the raw blocks of an image. +//! See `exr::image` module for a high-level interface. +//! +//! Handle compressed and uncompressed pixel byte blocks. Includes compression and decompression, +//! and reading a complete image into blocks. +//! +//! Start with the `block::read(...)` +//! and `block::write(...)` functions. + + +pub mod writer; +pub mod reader; + +pub mod lines; +pub mod samples; +pub mod chunk; + + +use std::io::{Read, Seek, Write}; +use crate::error::{Result, UnitResult, Error, usize_to_i32}; +use crate::meta::{Headers, MetaData, BlockDescription}; +use crate::math::Vec2; +use crate::compression::ByteVec; +use crate::block::chunk::{CompressedBlock, CompressedTileBlock, CompressedScanLineBlock, Chunk, TileCoordinates}; +use crate::meta::header::Header; +use crate::block::lines::{LineIndex, LineRef, LineSlice, LineRefMut}; +use crate::meta::attribute::ChannelList; + + +/// Specifies where a block of pixel data should be placed in the actual image. +/// This is a globally unique identifier which +/// includes the layer, level index, and pixel location. +#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)] +pub struct BlockIndex { + +    /// Index of the layer. +    pub layer: usize, + +    /// Index of the top left pixel from the block within the data window. +    pub pixel_position: Vec2<usize>, + +    /// Number of pixels in this block, extending to the right and downwards. +    /// Stays the same across all resolution levels. +    pub pixel_size: Vec2<usize>, + +    /// Index of the mip or rip level in the image. +    pub level: Vec2<usize>, +} + +/// Contains a block of pixel data and where that data should be placed in the actual image. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct UncompressedBlock { + +    /// Location of the data inside the image. +    pub index: BlockIndex, + +    /// Uncompressed pixel values of the whole block. +    /// One or more scan lines may be stored together as a scan line block. +    /// This byte vector contains all pixel rows, one after another. +    /// For each line in the tile, for each channel, the row values are contiguous. +    /// Stores all samples of the first channel, then all samples of the second channel, and so on. +    pub data: ByteVec, +} + +/// Immediately reads the meta data from the file. +/// Then, returns a reader that can be used to read all pixel blocks. +/// From the reader, you can pull each compressed chunk from the file. +/// Alternatively, you can create a decompressor, and pull the uncompressed data from it. +/// The reader is assumed to be buffered. +pub fn read<R: Read + Seek>(buffered_read: R, pedantic: bool) -> Result<self::reader::Reader<R>> { +    self::reader::Reader::read_from_buffered(buffered_read, pedantic) +} + +/// Immediately writes the meta data to the file. +/// Then, calls a closure with a writer that can be used to write all pixel blocks. +/// In the closure, you can push compressed chunks directly into the writer. +/// Alternatively, you can create a compressor, wrapping the writer, and push the uncompressed data to it. +/// The writer is assumed to be buffered. +pub fn write<W: Write + Seek>( +    buffered_write: W, headers: Headers, compatibility_checks: bool, +    write_chunks: impl FnOnce(MetaData, &mut self::writer::ChunkWriter<W>) -> UnitResult +) -> UnitResult { +    self::writer::write_chunks_with(buffered_write, headers, compatibility_checks, write_chunks) +} + + + + +/// This iterator tells you the block indices of all blocks that must be in the image. +/// The order of the blocks depends on the `LineOrder` attribute +/// (unspecified line order is treated the same as increasing line order). +/// The blocks written to the file must be exactly in this order, +/// except for when the `LineOrder` is unspecified. +/// The index represents the block index, in increasing line order, within the header. +pub fn enumerate_ordered_header_block_indices(headers: &[Header]) -> impl '_ + Iterator<Item=(usize, BlockIndex)> { +    headers.iter().enumerate().flat_map(|(layer_index, header)|{ +        header.enumerate_ordered_blocks().map(move |(index_in_header, tile)|{ +            let data_indices = header.get_absolute_block_pixel_coordinates(tile.location).expect("tile coordinate bug"); + +            let block = BlockIndex { +                layer: layer_index, +                level: tile.location.level_index, +                pixel_position: data_indices.position.to_usize("data indices start").expect("data index bug"), +                pixel_size: data_indices.size, +            }; + +            (index_in_header, block) +        }) +    }) +} + + +impl UncompressedBlock { + +    /// Decompress the possibly compressed chunk and returns an `UncompressedBlock`. +    // for uncompressed data, the ByteVec in the chunk is moved all the way +    #[inline] +    #[must_use] +    pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData, pedantic: bool) -> Result<Self> { +        let header: &Header = meta_data.headers.get(chunk.layer_index) +            .ok_or(Error::invalid("chunk layer index"))?; + +        let tile_data_indices = header.get_block_data_indices(&chunk.compressed_block)?; +        let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_data_indices)?; + +        absolute_indices.validate(Some(header.layer_size))?; + +        match chunk.compressed_block { +            CompressedBlock::Tile(CompressedTileBlock { compressed_pixels, .. }) | +            CompressedBlock::ScanLine(CompressedScanLineBlock { compressed_pixels, .. }) => { +                Ok(UncompressedBlock { +                    data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices, pedantic)?, +                    index: BlockIndex { +                        layer: chunk.layer_index, +                        pixel_position: absolute_indices.position.to_usize("data indices start")?, +                        level: tile_data_indices.level_index, +                        pixel_size: absolute_indices.size, +                    } +                }) +            }, + +            _ => return Err(Error::unsupported("deep data not supported yet")) +        } +    } + +    /// Consume this block by compressing it, returning a `Chunk`. +    // for uncompressed data, the ByteVec in the chunk is moved all the way +    #[inline] +    #[must_use] +    pub fn compress_to_chunk(self, headers: &[Header]) -> Result<Chunk> { +        let UncompressedBlock { data, index } = self; + +        let header: &Header = headers.get(index.layer) +            .expect("block layer index bug"); + +        let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area(); // TODO sampling?? +        if expected_byte_size != data.len() { +            panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len()); +        } + +        let tile_coordinates = TileCoordinates { +            // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) +            tile_index: index.pixel_position / header.max_block_pixel_size(), // TODO sampling?? +            level_index: index.level, +        }; + +        let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_coordinates)?; +        absolute_indices.validate(Some(header.layer_size))?; + +        if !header.compression.may_loose_data() { debug_assert_eq!( +            &header.compression.decompress_image_section( +                header, +                header.compression.compress_image_section(header, data.clone(), absolute_indices)?, +                absolute_indices, +                true +            ).unwrap(), +            &data, +            "compression method not round trippin'" +        ); } + +        let compressed_data = header.compression.compress_image_section(header, data, absolute_indices)?; + +        Ok(Chunk { +            layer_index: index.layer, +            compressed_block : match header.blocks { +                BlockDescription::ScanLines => CompressedBlock::ScanLine(CompressedScanLineBlock { +                    compressed_pixels: compressed_data, + +                    // FIXME this calculation should not be made here but elsewhere instead (in meta::header?) +                    y_coordinate: usize_to_i32(index.pixel_position.y()) + header.own_attributes.layer_position.y(), // TODO sampling?? +                }), + +                BlockDescription::Tiles(_) => CompressedBlock::Tile(CompressedTileBlock { +                    compressed_pixels: compressed_data, +                    coordinates: tile_coordinates, +                }), +            } +        }) +    } + +    /// Iterate all the lines in this block. +    /// Each line contains the all samples for one of the channels. +    pub fn lines(&self, channels: &ChannelList) -> impl Iterator<Item=LineRef<'_>> { +        LineIndex::lines_in_block(self.index, channels) +            .map(move |(bytes, line)| LineSlice { location: line, value: &self.data[bytes] }) +    } + +    /* TODO pub fn lines_mut<'s>(&'s mut self, header: &Header) -> impl 's + Iterator<Item=LineRefMut<'s>> { +        LineIndex::lines_in_block(self.index, &header.channels) +            .map(move |(bytes, line)| LineSlice { location: line, value: &mut self.data[bytes] }) +    }*/ + +    /*// TODO make iterator +    /// Call a closure for each line of samples in this uncompressed block. +    pub fn for_lines( +        &self, header: &Header, +        mut accept_line: impl FnMut(LineRef<'_>) -> UnitResult +    ) -> UnitResult { +        for (bytes, line) in LineIndex::lines_in_block(self.index, &header.channels) { +            let line_ref = LineSlice { location: line, value: &self.data[bytes] }; +            accept_line(line_ref)?; +        } + +        Ok(()) +    }*/ + +    // TODO from iterator?? +    /// Create an uncompressed block byte vector by requesting one line of samples after another. +    pub fn collect_block_data_from_lines( +        channels: &ChannelList, block_index: BlockIndex, +        mut extract_line: impl FnMut(LineRefMut<'_>) +    ) -> Vec<u8> +    { +        let byte_count = block_index.pixel_size.area() * channels.bytes_per_pixel; +        let mut block_bytes = vec![0_u8; byte_count]; + +        for (byte_range, line_index) in LineIndex::lines_in_block(block_index, channels) { +            extract_line(LineRefMut { // TODO subsampling +                value: &mut block_bytes[byte_range], +                location: line_index, +            }); +        } + +        block_bytes +    } + +    /// Create an uncompressed block by requesting one line of samples after another. +    pub fn from_lines( +        channels: &ChannelList, block_index: BlockIndex, +        extract_line: impl FnMut(LineRefMut<'_>) +    ) -> Self { +        Self { +            index: block_index, +            data: Self::collect_block_data_from_lines(channels, block_index, extract_line) +        } +    } +}
\ No newline at end of file diff --git a/vendor/exr/src/block/reader.rs b/vendor/exr/src/block/reader.rs new file mode 100644 index 0000000..bb9888e --- /dev/null +++ b/vendor/exr/src/block/reader.rs @@ -0,0 +1,527 @@ +//! Composable structures to handle reading an image. + + +use std::convert::TryFrom; +use std::fmt::Debug; +use std::io::{Read, Seek}; +use rayon_core::{ThreadPool, ThreadPoolBuildError}; + +use smallvec::alloc::sync::Arc; + +use crate::block::{BlockIndex, UncompressedBlock}; +use crate::block::chunk::{Chunk, TileCoordinates}; +use crate::compression::Compression; +use crate::error::{Error, Result, u64_to_usize, UnitResult}; +use crate::io::{PeekRead, Tracking}; +use crate::meta::{MetaData, OffsetTables}; +use crate::meta::header::Header; + +/// Decode the meta data from a byte source, keeping the source ready for further reading. +/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. +#[derive(Debug)] +pub struct Reader<R> { +    meta_data: MetaData, +    remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? +} + +impl<R: Read + Seek> Reader<R> { + +    /// Start the reading process. +    /// Immediately decodes the meta data into an internal field. +    /// Access it via`meta_data()`. +    pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { +        let mut remaining_reader = PeekRead::new(Tracking::new(read)); +        let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; +        Ok(Self { meta_data, remaining_reader }) +    } + +    // must not be mutable, as reading the file later on relies on the meta data +    /// The decoded exr meta data from the file. +    pub fn meta_data(&self) -> &MetaData { &self.meta_data } + +    /// The decoded exr meta data from the file. +    pub fn headers(&self) -> &[Header] { &self.meta_data.headers } + +    /// Obtain the meta data ownership. +    pub fn into_meta_data(self) -> MetaData { self.meta_data } + +    /// Prepare to read all the chunks from the file. +    /// Does not decode the chunks now, but returns a decoder. +    /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. +    pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { +        let total_chunk_count = { +            if pedantic { +                let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; +                validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; +                offset_tables.iter().map(|table| table.len()).sum() +            } +            else { +                usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?) +                    .expect("too large chunk count for this machine") +            } +        }; + +        Ok(AllChunksReader { +            meta_data: self.meta_data, +            remaining_chunks: 0 .. total_chunk_count, +            remaining_bytes: self.remaining_reader, +            pedantic +        }) +    } + +    /// Prepare to read some the chunks from the file. +    /// Does not decode the chunks now, but returns a decoder. +    /// Reading only some chunks may seeking the file, potentially skipping many bytes. +    // TODO tile indices add no new information to block index?? +    pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { +        let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; + +        // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? +        if pedantic { +            validate_offset_tables( +                self.meta_data.headers.as_slice(), &offset_tables, +                self.remaining_reader.byte_position() +            )?; +        } + +        let mut filtered_offsets = Vec::with_capacity( +            (self.meta_data.headers.len() * 32).min(2*2048) +        ); + +        // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied + +        for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers +            for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order +                let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; + +                let block = BlockIndex { +                    layer: header_index, +                    level: tile.location.level_index, +                    pixel_position: data_indices.position.to_usize("data indices start")?, +                    pixel_size: data_indices.size, +                }; + +                if filter(&self.meta_data, tile.location, block) { +                    filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` +                } +            }; +        } + +        filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) + +        if pedantic { +            // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. +            if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { +                return Err(Error::invalid("chunk offset table")) +            } +        } + +        Ok(FilteredChunksReader { +            meta_data: self.meta_data, +            expected_filtered_chunk_count: filtered_offsets.len(), +            remaining_filtered_chunk_indices: filtered_offsets.into_iter(), +            remaining_bytes: self.remaining_reader +        }) +    } +} + + +fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { +    let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max +        .map(|header| header.max_pixel_file_bytes()) +        .sum(); + +    // check that each offset is within the bounds +    let end_byte = chunks_start_byte + max_pixel_bytes; +    let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64)) +        .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte); + +    if is_invalid { Err(Error::invalid("offset table")) } +    else { Ok(()) } +} + + + + +/// Decode the desired chunks and skip the unimportant chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct FilteredChunksReader<R> { +    meta_data: MetaData, +    expected_filtered_chunk_count: usize, +    remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, +    remaining_bytes: PeekRead<Tracking<R>>, +} + +/// Decode all chunks in the file without seeking. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct AllChunksReader<R> { +    meta_data: MetaData, +    remaining_chunks: std::ops::Range<usize>, +    remaining_bytes: PeekRead<Tracking<R>>, +    pedantic: bool, +} + +/// Decode chunks in the file without seeking. +/// Calls the supplied closure for each chunk. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Also contains the image meta data. +#[derive(Debug)] +pub struct OnProgressChunksReader<R, F> { +    chunks_reader: R, +    decoded_chunks: usize, +    callback: F, +} + +/// Decode chunks in the file. +/// The decoded chunks can be decompressed by calling +/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. +/// Call `on_progress` to have a callback with each block. +/// Also contains the image meta data. +pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { + +    /// The decoded exr meta data from the file. +    fn meta_data(&self) -> &MetaData; + +    /// The decoded exr headers from the file. +    fn headers(&self) -> &[Header] { &self.meta_data().headers } + +    /// The number of chunks that this reader will return in total. +    /// Can be less than the total number of chunks in the file, if some chunks are skipped. +    fn expected_chunk_count(&self) -> usize; + +    /// Read the next compressed chunk from the file. +    /// Equivalent to `.next()`, as this also is an iterator. +    /// Returns `None` if all chunks have been read. +    fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } + +    /// Create a new reader that calls the provided progress +    /// callback for each chunk that is read from the file. +    /// If the file can be successfully decoded, +    /// the progress will always at least once include 0.0 at the start and 1.0 at the end. +    fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { +        OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } +    } + +    /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. +    /// The order of the blocks is not deterministic. +    /// You can also use `parallel_decompressor` to obtain an iterator instead. +    /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. +    // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) +    fn decompress_parallel( +        self, pedantic: bool, +        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult +    ) -> UnitResult +    { +        let mut decompressor = match self.parallel_decompressor(pedantic) { +            Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), +            Ok(decompressor) => decompressor, +        }; + +        while let Some(block) = decompressor.next() { +            insert_block(decompressor.meta_data(), block?)?; +        } + +        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); +        Ok(()) +    } + +    /// Return an iterator that decompresses the chunks with multiple threads. +    /// The order of the blocks is not deterministic. +    /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. +    /// By default, this uses as many threads as there are CPUs. +    /// Returns the `self` if there is no need for parallel decompression. +    fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { +        ParallelBlockDecompressor::new(self, pedantic) +    } + +    /// Return an iterator that decompresses the chunks in this thread. +    /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. +    fn decompress_sequential( +        self, pedantic: bool, +        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult +    ) -> UnitResult +    { +        let mut decompressor = self.sequential_decompressor(pedantic); +        while let Some(block) = decompressor.next() { +            insert_block(decompressor.meta_data(), block?)?; +        } + +        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); +        Ok(()) +    } + +    /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. +    fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { +        SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } +    } +} + +impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { +    fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } +    fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } +} + +impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} +impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { +    type Item = Result<Chunk>; + +    fn next(&mut self) -> Option<Self::Item> { +        self.chunks_reader.next().map(|item|{ +            { +                let total_chunks = self.expected_chunk_count() as f64; +                let callback = &mut self.callback; +                callback(self.decoded_chunks as f64 / total_chunks); +            } + +            self.decoded_chunks += 1; +            item +        }) +            .or_else(||{ +                debug_assert_eq!( +                    self.decoded_chunks, self.expected_chunk_count(), +                    "chunks reader finished but not all chunks are decompressed" +                ); + +                let callback = &mut self.callback; +                callback(1.0); +                None +            }) +    } + +    fn size_hint(&self) -> (usize, Option<usize>) { +        self.chunks_reader.size_hint() +    } +} + +impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { +    fn meta_data(&self) -> &MetaData { &self.meta_data } +    fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } +} + +impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} +impl<R: Read + Seek> Iterator for AllChunksReader<R> { +    type Item = Result<Chunk>; + +    fn next(&mut self) -> Option<Self::Item> { +        // read as many chunks as the file should contain (inferred from meta data) +        let next_chunk = self.remaining_chunks.next() +            .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); + +        // if no chunks are left, but some bytes remain, return error +        if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { +            return Some(Err(Error::invalid("end of file expected"))); +        } + +        next_chunk +    } + +    fn size_hint(&self) -> (usize, Option<usize>) { +        (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) +    } +} + +impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { +    fn meta_data(&self) -> &MetaData { &self.meta_data } +    fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } +} + +impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} +impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { +    type Item = Result<Chunk>; + +    fn next(&mut self) -> Option<Self::Item> { +        // read as many chunks as we have desired chunk offsets +        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ +            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts +                                          usize::try_from(next_chunk_location) +                                              .expect("too large chunk position for this machine") +            )?; + +            let meta_data = &self.meta_data; +            Chunk::read(&mut self.remaining_bytes, meta_data) +        }) + +        // TODO remember last chunk index and then seek to index+size and check whether bytes are left? +    } + +    fn size_hint(&self) -> (usize, Option<usize>) { +        (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) +    } +} + +/// Read all chunks from the file, decompressing each chunk immediately. +/// Implements iterator. +#[derive(Debug)] +pub struct SequentialBlockDecompressor<R: ChunksReader> { +    remaining_chunks_reader: R, +    pedantic: bool, +} + +impl<R: ChunksReader> SequentialBlockDecompressor<R> { + +    /// The extracted meta data from the image file. +    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } + +    /// Read and then decompress a single block of pixels from the byte source. +    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { +        self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ +            UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) +        }) +    } +} + +/// Decompress the chunks in a file in parallel. +/// The first call to `next` will fill the thread pool with jobs, +/// starting to decompress the next few blocks. +/// These jobs will finish, even if you stop reading more blocks. +/// Implements iterator. +#[derive(Debug)] +pub struct ParallelBlockDecompressor<R: ChunksReader> { +    remaining_chunks: R, +    sender: flume::Sender<Result<UncompressedBlock>>, +    receiver: flume::Receiver<Result<UncompressedBlock>>, +    currently_decompressing_count: usize, +    max_threads: usize, + +    shared_meta_data_ref: Arc<MetaData>, +    pedantic: bool, + +    pool: ThreadPool, +} + +impl<R: ChunksReader> ParallelBlockDecompressor<R> { + +    /// Create a new decompressor. Does not immediately spawn any tasks. +    /// Decompression starts after the first call to `next`. +    /// Returns the chunks if parallel decompression should not be used. +    /// Use `new_with_thread_pool` to customize the threadpool. +    pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { +        Self::new_with_thread_pool(chunks, pedantic, ||{ +            rayon_core::ThreadPoolBuilder::new() +                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) +                .build() +        }) +    } + +    /// Create a new decompressor. Does not immediately spawn any tasks. +    /// Decompression starts after the first call to `next`. +    /// Returns the chunks if parallel decompression should not be used. +    pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) +        -> std::result::Result<Self, R> +        where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> +    { +        // if no compression is used in the file, don't use a threadpool +        if chunks.meta_data().headers.iter() +            .all(|head|head.compression == Compression::Uncompressed) +        { +            return Err(chunks); +        } + +        // in case thread pool creation fails (for example on WASM currently), +        // we revert to sequential decompression +        let pool = match try_create_thread_pool() { +            Ok(pool) => pool, + +            // TODO print warning? +            Err(_) => return Err(chunks), +        }; + +        let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times + +        let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + +        Ok(Self { +            shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), +            currently_decompressing_count: 0, +            remaining_chunks: chunks, +            sender: send, +            receiver: recv, +            pedantic, +            max_threads, + +            pool, +        }) +    } + +    /// Fill the pool with decompression jobs. Returns the first job that finishes. +    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { + +        while self.currently_decompressing_count < self.max_threads { +            let block = self.remaining_chunks.next(); +            if let Some(block) = block { +                let block = match block { +                    Ok(block) => block, +                    Err(error) => return Some(Err(error)) +                }; + +                let sender = self.sender.clone(); +                let meta = self.shared_meta_data_ref.clone(); +                let pedantic = self.pedantic; + +                self.currently_decompressing_count += 1; + +                self.pool.spawn(move || { +                    let decompressed_or_err = UncompressedBlock::decompress_chunk( +                        block, &meta, pedantic +                    ); + +                    // by now, decompressing could have failed in another thread. +                    // the error is then already handled, so we simply +                    // don't send the decompressed block and do nothing +                    let _ = sender.send(decompressed_or_err); +                }); +            } +            else { +                // there are no chunks left to decompress +                break; +            } +        } + +        if self.currently_decompressing_count > 0 { +            let next = self.receiver.recv() +                .expect("all decompressing senders hung up but more messages were expected"); + +            self.currently_decompressing_count -= 1; +            Some(next) +        } +        else { +            debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable +            debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); +            None +        } +    } + +    /// The extracted meta data of the image file. +    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } +} + +impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { +    type Item = Result<UncompressedBlock>; +    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } +    fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } +} + +impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} +impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { +    type Item = Result<UncompressedBlock>; +    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } +    fn size_hint(&self) -> (usize, Option<usize>) { +        let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; +        (remaining, Some(remaining)) +    } +} + + + + + diff --git a/vendor/exr/src/block/samples.rs b/vendor/exr/src/block/samples.rs new file mode 100644 index 0000000..4352b11 --- /dev/null +++ b/vendor/exr/src/block/samples.rs @@ -0,0 +1,248 @@ +//! Extract pixel samples from a block of pixel bytes. + +use crate::prelude::*; +use half::prelude::HalfFloatSliceExt; + + +/// A single red, green, blue, or alpha value. +#[derive(Copy, Clone, Debug)] +pub enum Sample { + +    /// A 16-bit float sample. +    F16(f16), + +    /// A 32-bit float sample. +    F32(f32), + +    /// An unsigned integer sample. +    U32(u32) +} + +impl Sample { + +    /// Create a sample containing a 32-bit float. +    pub fn f32(f32: f32) -> Self { Sample::F32(f32) } + +    /// Create a sample containing a 16-bit float. +    pub fn f16(f16: f16) -> Self { Sample::F16(f16) } + +    /// Create a sample containing a 32-bit integer. +    pub fn u32(u32: u32) -> Self { Sample::U32(u32) } + +    /// Convert the sample to an f16 value. This has lower precision than f32. +    /// Note: An f32 can only represent integers up to `1024` as precise as a u32 could. +    #[inline] +    pub fn to_f16(self) -> f16 { +        match self { +            Sample::F16(sample) => sample, +            Sample::F32(sample) => f16::from_f32(sample), +            Sample::U32(sample) => f16::from_f32(sample as f32), +        } +    } + +    /// Convert the sample to an f32 value. +    /// Note: An f32 can only represent integers up to `8388608` as precise as a u32 could. +    #[inline] +    pub fn to_f32(self) -> f32 { +        match self { +            Sample::F32(sample) => sample, +            Sample::F16(sample) => sample.to_f32(), +            Sample::U32(sample) => sample as f32, +        } +    } + +    /// Convert the sample to a u32. Rounds floats to integers the same way that `3.1 as u32` does. +    #[inline] +    pub fn to_u32(self) -> u32 { +        match self { +            Sample::F16(sample) => sample.to_f32() as u32, +            Sample::F32(sample) => sample as u32, +            Sample::U32(sample) => sample, +        } +    } + +    /// Is this value not a number? +    #[inline] +    pub fn is_nan(self) -> bool { +        match self { +            Sample::F16(value) => value.is_nan(), +            Sample::F32(value) => value.is_nan(), +            Sample::U32(_) => false, +        } +    } + +    /// Is this value zero or negative zero? +    #[inline] +    pub fn is_zero(&self) -> bool { +        match *self { +            Sample::F16(value) => value == f16::ZERO || value == f16::NEG_ZERO, +            Sample::F32(value) => value == 0.0, +            Sample::U32(value) => value == 0, +        } +    } +} + +impl PartialEq for Sample { +    fn eq(&self, other: &Self) -> bool { +        match *self { +            Sample::F16(num) => num == other.to_f16(), +            Sample::F32(num) => num == other.to_f32(), +            Sample::U32(num) => num == other.to_u32(), +        } +    } +} + +// this is not recommended because it may hide whether a color is transparent or opaque and might be undesired for depth channels +impl Default for Sample { +    fn default() -> Self { Sample::F32(0.0) } +} + +impl From<f16> for Sample { #[inline] fn from(f: f16) -> Self { Sample::F16(f) } } +impl From<f32> for Sample { #[inline] fn from(f: f32) -> Self { Sample::F32(f) } } +impl From<u32> for Sample { #[inline] fn from(f: u32) -> Self { Sample::U32(f) } } + +impl<T> From<Option<T>> for Sample where T: Into<Sample> + Default { +    #[inline] fn from(num: Option<T>) -> Self { num.unwrap_or_default().into() } +} + + +impl From<Sample> for f16 { #[inline] fn from(s: Sample) -> Self { s.to_f16() } } +impl From<Sample> for f32 { #[inline] fn from(s: Sample) -> Self { s.to_f32() } } +impl From<Sample> for u32 { #[inline] fn from(s: Sample) -> Self { s.to_u32() } } + + +/// Create an arbitrary sample type from one of the defined sample types. +/// Should be compiled to a no-op where the file contains the predicted sample type. +/// The slice functions should be optimized into a `memcpy` where there is no conversion needed. +pub trait FromNativeSample: Sized + Copy + Default + 'static { + +    /// Create this sample from a f16, trying to represent the same numerical value +    fn from_f16(value: f16) -> Self; + +    /// Create this sample from a f32, trying to represent the same numerical value +    fn from_f32(value: f32) -> Self; + +    /// Create this sample from a u32, trying to represent the same numerical value +    fn from_u32(value: u32) -> Self; + +    /// Convert all values from the slice into this type. +    /// This function exists to allow the compiler to perform a vectorization optimization. +    /// Note that this default implementation will **not** be vectorized by the compiler automatically. +    /// For maximum performance you will need to override this function and implement it via +    /// an explicit batched conversion such as [`convert_to_f32_slice`](https://docs.rs/half/2.3.1/half/slice/trait.HalfFloatSliceExt.html#tymethod.convert_to_f32_slice) +    #[inline] +    fn from_f16s(from: &[f16], to: &mut [Self]) { +        assert_eq!(from.len(), to.len(), "slices must have the same length"); +        for (from, to) in from.iter().zip(to.iter_mut()) { +            *to = Self::from_f16(*from); +        } +    } + +    /// Convert all values from the slice into this type. +    /// This function exists to allow the compiler to perform a vectorization optimization. +    /// Note that this default implementation will be vectorized by the compiler automatically. +    #[inline] +    fn from_f32s(from: &[f32], to: &mut [Self]) { +        assert_eq!(from.len(), to.len(), "slices must have the same length"); +        for (from, to) in from.iter().zip(to.iter_mut()) { +            *to = Self::from_f32(*from); +        } +    } + +    /// Convert all values from the slice into this type. +    /// This function exists to allow the compiler to perform a vectorization optimization. +    /// Note that this default implementation will be vectorized by the compiler automatically, +    /// provided that the CPU supports the necessary conversion instructions. +    /// For example, x86_64 lacks the instructions to convert `u32` to floats, +    /// so this will inevitably be slow on x86_64. +    #[inline] +    fn from_u32s(from: &[u32], to: &mut [Self]) { +        assert_eq!(from.len(), to.len(), "slices must have the same length"); +        for (from, to) in from.iter().zip(to.iter_mut()) { +            *to = Self::from_u32(*from); +        } +    } +} + +// TODO haven't i implemented this exact behaviour already somewhere else in this library...?? +impl FromNativeSample for f32 { +    #[inline] fn from_f16(value: f16) -> Self { value.to_f32() } +    #[inline] fn from_f32(value: f32) -> Self { value } +    #[inline] fn from_u32(value: u32) -> Self { value as f32 } + +    // f16 is a custom type +    // so the compiler can not automatically vectorize the conversion +    // that's why we need to specialize this function +    #[inline] +    fn from_f16s(from: &[f16], to: &mut [Self]) { +        from.convert_to_f32_slice(to); +    } +} + +impl FromNativeSample for u32 { +    #[inline] fn from_f16(value: f16) -> Self { value.to_f32() as u32 } +    #[inline] fn from_f32(value: f32) -> Self { value as u32 } +    #[inline] fn from_u32(value: u32) -> Self { value } +} + +impl FromNativeSample for f16 { +    #[inline] fn from_f16(value: f16) -> Self { value } +    #[inline] fn from_f32(value: f32) -> Self { f16::from_f32(value) } +    #[inline] fn from_u32(value: u32) -> Self { f16::from_f32(value as f32) } + +    // f16 is a custom type +    // so the compiler can not automatically vectorize the conversion +    // that's why we need to specialize this function +    #[inline] +    fn from_f32s(from: &[f32], to: &mut [Self]) { +        to.convert_from_f32_slice(from) +    } +} + +impl FromNativeSample for Sample { +    #[inline] fn from_f16(value: f16) -> Self { Self::from(value) } +    #[inline] fn from_f32(value: f32) -> Self { Self::from(value) } +    #[inline] fn from_u32(value: u32) -> Self { Self::from(value) } +} + + +/// Convert any type into one of the supported sample types. +/// Should be compiled to a no-op where the file contains the predicted sample type +pub trait IntoNativeSample: Copy + Default + Sync + 'static { + +    /// Convert this sample to an f16, trying to represent the same numerical value. +    fn to_f16(&self) -> f16; + +    /// Convert this sample to an f32, trying to represent the same numerical value. +    fn to_f32(&self) -> f32; + +    /// Convert this sample to an u16, trying to represent the same numerical value. +    fn to_u32(&self) -> u32; +} + +impl IntoNativeSample for f16 { +    fn to_f16(&self) -> f16 { f16::from_f16(*self) } +    fn to_f32(&self) -> f32 { f32::from_f16(*self) } +    fn to_u32(&self) -> u32 { u32::from_f16(*self) } +} + +impl IntoNativeSample for f32 { +    fn to_f16(&self) -> f16 { f16::from_f32(*self) } +    fn to_f32(&self) -> f32 { f32::from_f32(*self) } +    fn to_u32(&self) -> u32 { u32::from_f32(*self) } +} + +impl IntoNativeSample for u32 { +    fn to_f16(&self) -> f16 { f16::from_u32(*self) } +    fn to_f32(&self) -> f32 { f32::from_u32(*self) } +    fn to_u32(&self) -> u32 { u32::from_u32(*self) } +} + +impl IntoNativeSample for Sample { +    fn to_f16(&self) -> f16 { Sample::to_f16(*self) } +    fn to_f32(&self) -> f32 { Sample::to_f32(*self) } +    fn to_u32(&self) -> u32 { Sample::to_u32(*self) } +} + + + diff --git a/vendor/exr/src/block/writer.rs b/vendor/exr/src/block/writer.rs new file mode 100644 index 0000000..1227c69 --- /dev/null +++ b/vendor/exr/src/block/writer.rs @@ -0,0 +1,468 @@ +//! Composable structures to handle writing an image. + + +use std::fmt::Debug; +use std::io::Seek; +use std::iter::Peekable; +use std::ops::Not; +use rayon_core::{ThreadPool, ThreadPoolBuildError}; + +use smallvec::alloc::collections::BTreeMap; + +use crate::block::UncompressedBlock; +use crate::block::chunk::{Chunk}; +use crate::compression::Compression; +use crate::error::{Error, Result, UnitResult, usize_to_u64}; +use crate::io::{Data, Tracking, Write}; +use crate::meta::{Headers, MetaData, OffsetTables}; +use crate::meta::attribute::LineOrder; + +/// Write an exr file by writing one chunk after another in a closure. +/// In the closure, you are provided a chunk writer, which should be used to write all the chunks. +/// Assumes the your write destination is buffered. +pub fn write_chunks_with<W: Write + Seek>( +    buffered_write: W, headers: Headers, pedantic: bool, +    write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult +) -> UnitResult { +    // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed +    let (meta, mut writer) = ChunkWriter::new_for_buffered(buffered_write, headers, pedantic)?; +    write_chunks(meta, &mut writer)?; +    writer.complete_meta_data() +} + +/// Can consume compressed pixel chunks, writing them a file. +/// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data, +/// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`. +/// Use `on_progress` to obtain a new writer +/// that triggers a callback for each block. +// #[must_use] +#[derive(Debug)] +#[must_use] +pub struct ChunkWriter<W> { +    header_count: usize, +    byte_writer: Tracking<W>, +    chunk_indices_byte_location: std::ops::Range<usize>, +    chunk_indices_increasing_y: OffsetTables, +    chunk_count: usize, // TODO compose? +} + +/// A new writer that triggers a callback +/// for each block written to the inner writer. +#[derive(Debug)] +#[must_use] +pub struct OnProgressChunkWriter<'w, W, F> { +    chunk_writer: &'w mut W, +    written_chunks: usize, +    on_progress: F, +} + +/// Write chunks to a byte destination. +/// Then write each chunk with `writer.write_chunk(chunk)`. +pub trait ChunksWriter: Sized { + +    /// The total number of chunks that the complete file will contain. +    fn total_chunks_count(&self) -> usize; + +    /// Any more calls will result in an error and have no effect. +    /// If writing results in an error, the file and the writer +    /// may remain in an invalid state and should not be used further. +    /// Errors when the chunk at this index was already written. +    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult; + +    /// Obtain a new writer that calls the specified closure for each block that is written to this writer. +    fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) { +        OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress } +    } + +    /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer. +    fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> { +        SequentialBlocksCompressor::new(meta, self) +    } + +    /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer. +    /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead). +    fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> { +        ParallelBlocksCompressor::new(meta, self) +    } + +    /// Compresses all blocks to the file. +    /// The index of the block must be in increasing line order within the header. +    /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. +    fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { +        let mut writer = self.sequential_blocks_compressor(meta); + +        // TODO check block order if line order is not unspecified! +        for (index_in_header_increasing_y, block) in blocks { +            writer.compress_block(index_in_header_increasing_y, block)?; +        } + +        // TODO debug_assert_eq!(self.is_complete()); +        Ok(()) +    } + +    /// Compresses all blocks to the file. +    /// The index of the block must be in increasing line order within the header. +    /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. +    /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. +    fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { +        let mut parallel_writer = match self.parallel_blocks_compressor(meta) { +            None => return self.compress_all_blocks_sequential(meta, blocks), +            Some(writer) => writer, +        }; + +        // TODO check block order if line order is not unspecified! +        for (index_in_header_increasing_y, block) in blocks { +            parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?; +        } + +        // TODO debug_assert_eq!(self.is_complete()); +        Ok(()) +    } +} + + +impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek { + +    /// The total number of chunks that the complete file will contain. +    fn total_chunks_count(&self) -> usize { self.chunk_count } + +    /// Any more calls will result in an error and have no effect. +    /// If writing results in an error, the file and the writer +    /// may remain in an invalid state and should not be used further. +    /// Errors when the chunk at this index was already written. +    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { +        let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index]; + +        if index_in_header_increasing_y >= header_chunk_indices.len() { +            return Err(Error::invalid("too large chunk index")); +        } + +        let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y]; +        if *chunk_index_slot != 0 { +            return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y))); +        } + +        *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position()); +        chunk.write(&mut self.byte_writer, self.header_count)?; +        Ok(()) +    } +} + +impl<W> ChunkWriter<W> where W: Write + Seek { +    // -- the following functions are private, because they must be called in a strict order -- + +    /// Writes the meta data and zeroed offset tables as a placeholder. +    fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> { +        let mut write = Tracking::new(buffered_byte_writer); +        let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?; + +        // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now +        /*// if non-parallel compression, we always use increasing order anyways +        if !parallel || !has_compression { +            for header in &mut headers { +                if header.line_order == LineOrder::Unspecified { +                    header.line_order = LineOrder::Increasing; +                } +            } +        }*/ + +        let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum(); + +        let offset_table_start_byte = write.byte_position(); +        let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE; + +        // skip offset tables, filling with 0, will be updated after the last chunk has been written +        write.seek_write_to(offset_table_end_byte)?; + +        let header_count = headers.len(); +        let chunk_indices_increasing_y = headers.iter() +            .map(|header| vec![0_u64; header.chunk_count]).collect(); + +        let meta_data = MetaData { requirements, headers }; + +        Ok((meta_data, ChunkWriter { +            header_count, +            byte_writer: write, +            chunk_count: offset_table_size, +            chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte, +            chunk_indices_increasing_y, +        })) +    } + +    /// Seek back to the meta data, write offset tables, and flush the byte writer. +    /// Leaves the writer seeked to the middle of the file. +    fn complete_meta_data(mut self) -> UnitResult { +        if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) { +            return Err(Error::invalid("some chunks are not written yet")) +        } + +        // write all offset tables +        debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated"); +        self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?; + +        for table in self.chunk_indices_increasing_y { +            u64::write_slice(&mut self.byte_writer, table.as_slice())?; +        } + +        self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning +        Ok(()) +    } + +} + + +impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) { +    fn total_chunks_count(&self) -> usize { +        self.chunk_writer.total_chunks_count() +    } + +    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { +        let total_chunks = self.total_chunks_count(); +        let on_progress = &mut self.on_progress; + +        // guarantee on_progress being called with 0 once +        if self.written_chunks == 0 { on_progress(0.0); } + +        self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?; + +        self.written_chunks += 1; + +        on_progress({ +            // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0 +            if self.written_chunks == total_chunks { 1.0 } +            else { self.written_chunks as f64 / total_chunks as f64 } +        }); + +        Ok(()) +    } +} + + +/// Write blocks that appear in any order and reorder them before writing. +#[derive(Debug)] +#[must_use] +pub struct SortedBlocksWriter<'w, W> { +    chunk_writer: &'w mut W, +    pending_chunks: BTreeMap<usize, (usize, Chunk)>, +    unwritten_chunk_indices: Peekable<std::ops::Range<usize>>, +    requires_sorting: bool, // using this instead of Option, because of borrowing +} + + +impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter { + +    /// New sorting writer. Returns `None` if sorting is not required. +    pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> { +        let requires_sorting = meta_data.headers.iter() +            .any(|header| header.line_order != LineOrder::Unspecified); + +        let total_chunk_count = chunk_writer.total_chunks_count(); + +        SortedBlocksWriter { +            pending_chunks: BTreeMap::new(), +            unwritten_chunk_indices: (0 .. total_chunk_count).peekable(), +            requires_sorting, +            chunk_writer +        } +    } + +    /// Write the chunk or stash it. In the closure, write all chunks that can be written now. +    pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult { +        if self.requires_sorting.not() { +            return self.chunk_writer.write_chunk(chunk_y_index, chunk); +        } + +        // write this chunk now if possible +        if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){ +            self.chunk_writer.write_chunk(chunk_y_index, chunk)?; +            self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); + +            // write all pending blocks that are immediate successors of this block +            while let Some((next_chunk_y_index, next_chunk)) = self +                .unwritten_chunk_indices.peek().cloned() +                .and_then(|id| self.pending_chunks.remove(&id)) +            { +                self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?; +                self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); +            } +        } + +        else { +            // the argument block is not to be written now, +            // and all the pending blocks are not next up either, +            // so just stash this block +            self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk)); +        } + +        Ok(()) +    } + +    /// Where the chunks will be written to. +    pub fn inner_chunks_writer(&self) -> &W { +        &self.chunk_writer +    } +} + + + +/// Compress blocks to a chunk writer in this thread. +#[derive(Debug)] +#[must_use] +pub struct SequentialBlocksCompressor<'w, W> { +    meta: &'w MetaData, +    chunks_writer: &'w mut W, +} + +impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter { + +    /// New blocks writer. +    pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } } + +    /// This is where the compressed blocks are written to. +    pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer } + +    /// Compress a single block immediately. The index of the block must be in increasing line order. +    pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { +        self.chunks_writer.write_chunk( +            index_in_header_increasing_y, +            block.compress_to_chunk(&self.meta.headers)? +        ) +    } +} + +/// Compress blocks to a chunk writer with multiple threads. +#[derive(Debug)] +#[must_use] +pub struct ParallelBlocksCompressor<'w, W> { +    meta: &'w MetaData, +    sorted_writer: SortedBlocksWriter<'w, W>, + +    sender: flume::Sender<Result<(usize, usize, Chunk)>>, +    receiver: flume::Receiver<Result<(usize, usize, Chunk)>>, +    pool: rayon_core::ThreadPool, + +    currently_compressing_count: usize, +    written_chunk_count: usize, // used to check for last chunk +    max_threads: usize, +    next_incoming_chunk_index: usize, // used to remember original chunk order +} + +impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter { + +    /// New blocks writer. Returns none if sequential compression should be used. +    /// Use `new_with_thread_pool` to customize the threadpool. +    pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> { +        Self::new_with_thread_pool(meta, chunks_writer, ||{ +            rayon_core::ThreadPoolBuilder::new() +                .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index)) +                .build() +        }) +    } + +    /// New blocks writer. Returns none if sequential compression should be used. +    pub fn new_with_thread_pool<CreatePool>( +        meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool) +        -> Option<Self> +        where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> +    { +        if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) { +            return None; +        } + +        // in case thread pool creation fails (for example on WASM currently), +        // we revert to sequential compression +        let pool = match try_create_thread_pool() { +            Ok(pool) => pool, + +            // TODO print warning? +            Err(_) => return None, +        }; + +        let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times +        let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + +        Some(Self { +            sorted_writer: SortedBlocksWriter::new(meta, chunks_writer), +            next_incoming_chunk_index: 0, +            currently_compressing_count: 0, +            written_chunk_count: 0, +            sender: send, +            receiver: recv, +            max_threads, +            pool, +            meta, +        }) +    } + +    /// This is where the compressed blocks are written to. +    pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() } + +    // private, as may underflow counter in release mode +    fn write_next_queued_chunk(&mut self) -> UnitResult { +        debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left"); + +        let some_compressed_chunk = self.receiver.recv() +            .expect("cannot receive compressed block"); + +        self.currently_compressing_count -= 1; +        let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?; +        self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?; + +        self.written_chunk_count += 1; +        Ok(()) +    } + +    /// Wait until all currently compressing chunks in the compressor have been written. +    pub fn write_all_queued_chunks(&mut self) -> UnitResult { +        while self.currently_compressing_count > 0 { +            self.write_next_queued_chunk()?; +        } + +        debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count"); +        Ok(()) +    } + +    /// Add a single block to the compressor queue. The index of the block must be in increasing line order. +    /// When calling this function for the last block, this method waits until all the blocks have been written. +    /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`. +    /// Waits for a block from the queue to be written, if the queue already has enough items. +    pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { + +        // if pipe is full, block to wait for a slot to free up +        if self.currently_compressing_count >= self.max_threads { +            self.write_next_queued_chunk()?; +        } + +        // add the argument chunk to the compression queueue +        let index_in_file = self.next_incoming_chunk_index; +        let sender = self.sender.clone(); +        let meta = self.meta.clone(); + +        self.pool.spawn(move ||{ +            let compressed_or_err = block.compress_to_chunk(&meta.headers); + +            // by now, decompressing could have failed in another thread. +            // the error is then already handled, so we simply +            // don't send the decompressed block and do nothing +            let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed))); +        }); + +        self.currently_compressing_count += 1; +        self.next_incoming_chunk_index += 1; + +        // if this is the last chunk, wait for all chunks to complete before returning +        if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() { +            self.write_all_queued_chunks()?; +            debug_assert_eq!( +                self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(), +                "written chunk count mismatch" +            ); +        } + + +        Ok(()) +    } +} + + + | 
