Coverage Report

Created: 2025-07-11 07:25

/rust/registry/src/index.crates.io-6f17d22bba15001f/exr-1.73.0/src/block/writer.rs
Line
Count
Source (jump to first uncovered line)
1
//! Composable structures to handle writing an image.
2
3
4
use std::fmt::Debug;
5
use std::io::Seek;
6
use std::iter::Peekable;
7
use std::ops::Not;
8
use std::sync::mpsc;
9
use rayon_core::{ThreadPool, ThreadPoolBuildError};
10
11
use smallvec::alloc::collections::BTreeMap;
12
13
use crate::block::UncompressedBlock;
14
use crate::block::chunk::Chunk;
15
use crate::compression::Compression;
16
use crate::error::{Error, Result, UnitResult, usize_to_u64};
17
use crate::io::{Data, Tracking, Write};
18
use crate::meta::{Headers, MetaData, OffsetTables};
19
use crate::meta::attribute::LineOrder;
20
21
/// Write an exr file by writing one chunk after another in a closure.
22
/// In the closure, you are provided a chunk writer, which should be used to write all the chunks.
23
/// Assumes the your write destination is buffered.
24
0
pub fn write_chunks_with<W: Write + Seek>(
25
0
    buffered_write: W, headers: Headers, pedantic: bool,
26
0
    write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult
27
0
) -> UnitResult {
28
    // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed
29
0
    let (meta, mut writer) = ChunkWriter::new_for_buffered(buffered_write, headers, pedantic)?;
30
0
    write_chunks(meta, &mut writer)?;
31
0
    writer.complete_meta_data()
32
0
}
Unexecuted instantiation: exr::block::writer::write_chunks_with::<_, _>
Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>
Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>
Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}>
Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}>
33
34
/// Can consume compressed pixel chunks, writing them a file.
35
/// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data,
36
/// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`.
37
/// Use `on_progress` to obtain a new writer
38
/// that triggers a callback for each block.
39
// #[must_use]
40
#[derive(Debug)]
41
#[must_use]
42
pub struct ChunkWriter<W> {
43
    header_count: usize,
44
    byte_writer: Tracking<W>,
45
    chunk_indices_byte_location: std::ops::Range<usize>,
46
    chunk_indices_increasing_y: OffsetTables,
47
    chunk_count: usize, // TODO compose?
48
}
49
50
/// A new writer that triggers a callback
51
/// for each block written to the inner writer.
52
#[derive(Debug)]
53
#[must_use]
54
pub struct OnProgressChunkWriter<'w, W, F> {
55
    chunk_writer: &'w mut W,
56
    written_chunks: usize,
57
    on_progress: F,
58
}
59
60
/// Write chunks to a byte destination.
61
/// Then write each chunk with `writer.write_chunk(chunk)`.
62
pub trait ChunksWriter: Sized {
63
64
    /// The total number of chunks that the complete file will contain.
65
    fn total_chunks_count(&self) -> usize;
66
67
    /// Any more calls will result in an error and have no effect.
68
    /// If writing results in an error, the file and the writer
69
    /// may remain in an invalid state and should not be used further.
70
    /// Errors when the chunk at this index was already written.
71
    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult;
72
73
    /// Obtain a new writer that calls the specified closure for each block that is written to this writer.
74
0
    fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) {
75
0
        OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress }
76
0
    }
Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::on_progress::<_>
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::on_progress::<fn(f64)>
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::on_progress::<fn(f64)>
77
78
    /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer.
79
0
    fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> {
80
0
        SequentialBlocksCompressor::new(meta, self)
81
0
    }
Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::sequential_blocks_compressor
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::sequential_blocks_compressor
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::sequential_blocks_compressor
82
83
    /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer.
84
    /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead).
85
0
    fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> {
86
0
        ParallelBlocksCompressor::new(meta, self)
87
0
    }
Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::parallel_blocks_compressor
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::parallel_blocks_compressor
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::parallel_blocks_compressor
88
89
    /// Compresses all blocks to the file.
90
    /// The index of the block must be in increasing line order within the header.
91
    /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods.
92
0
    fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult {
93
0
        let mut writer = self.sequential_blocks_compressor(meta);
94
95
        // TODO check block order if line order is not unspecified!
96
0
        for (index_in_header_increasing_y, block) in blocks {
97
0
            writer.compress_block(index_in_header_increasing_y, block)?;
98
        }
99
100
        // TODO debug_assert_eq!(self.is_complete());
101
0
        Ok(())
102
0
    }
Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<_>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
103
104
    /// Compresses all blocks to the file.
105
    /// The index of the block must be in increasing line order within the header.
106
    /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods.
107
    /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
108
0
    fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult {
109
0
        let mut parallel_writer = match self.parallel_blocks_compressor(meta) {
110
0
            None => return self.compress_all_blocks_sequential(meta, blocks),
111
0
            Some(writer) => writer,
112
        };
113
114
        // TODO check block order if line order is not unspecified!
115
0
        for (index_in_header_increasing_y, block) in blocks {
116
0
            parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?;
117
        }
118
119
        // TODO debug_assert_eq!(self.is_complete());
120
0
        Ok(())
121
0
    }
Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<_>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>>
122
}
123
124
125
impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek {
126
127
    /// The total number of chunks that the complete file will contain.
128
0
    fn total_chunks_count(&self) -> usize { self.chunk_count }
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_> as exr::block::writer::ChunksWriter>::total_chunks_count
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::total_chunks_count
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::total_chunks_count
129
130
    /// Any more calls will result in an error and have no effect.
131
    /// If writing results in an error, the file and the writer
132
    /// may remain in an invalid state and should not be used further.
133
    /// Errors when the chunk at this index was already written.
134
0
    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult {
135
0
        let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index];
136
0
137
0
        if index_in_header_increasing_y >= header_chunk_indices.len() {
138
0
            return Err(Error::invalid("too large chunk index"));
139
0
        }
140
0
141
0
        let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y];
142
0
        if *chunk_index_slot != 0 {
143
0
            return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y)));
144
0
        }
145
0
146
0
        *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position());
147
0
        chunk.write(&mut self.byte_writer, self.header_count)?;
148
0
        Ok(())
149
0
    }
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_> as exr::block::writer::ChunksWriter>::write_chunk
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::write_chunk
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::write_chunk
150
}
151
152
impl<W> ChunkWriter<W> where W: Write + Seek {
153
    // -- the following functions are private, because they must be called in a strict order --
154
155
    /// Writes the meta data and zeroed offset tables as a placeholder.
156
0
    fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> {
157
0
        let mut write = Tracking::new(buffered_byte_writer);
158
0
        let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?;
159
160
        // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now
161
        /*// if non-parallel compression, we always use increasing order anyways
162
        if !parallel || !has_compression {
163
            for header in &mut headers {
164
                if header.line_order == LineOrder::Unspecified {
165
                    header.line_order = LineOrder::Increasing;
166
                }
167
            }
168
        }*/
169
170
0
        let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum();
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered::{closure#0}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#0}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#0}
171
0
172
0
        let offset_table_start_byte = write.byte_position();
173
0
        let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE;
174
0
175
0
        // skip offset tables, filling with 0, will be updated after the last chunk has been written
176
0
        write.seek_write_to(offset_table_end_byte)?;
177
178
0
        let header_count = headers.len();
179
0
        let chunk_indices_increasing_y = headers.iter()
180
0
            .map(|header| vec![0_u64; header.chunk_count]).collect();
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered::{closure#1}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#1}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#1}
181
0
182
0
        let meta_data = MetaData { requirements, headers };
183
0
184
0
        Ok((meta_data, ChunkWriter {
185
0
            header_count,
186
0
            byte_writer: write,
187
0
            chunk_count: offset_table_size,
188
0
            chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte,
189
0
            chunk_indices_increasing_y,
190
0
        }))
191
0
    }
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered
192
193
    /// Seek back to the meta data, write offset tables, and flush the byte writer.
194
    /// Leaves the writer seeked to the middle of the file.
195
0
    fn complete_meta_data(mut self) -> UnitResult {
196
0
        if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) {
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::complete_meta_data::{closure#0}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::complete_meta_data::{closure#0}
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::complete_meta_data::{closure#0}
197
0
            return Err(Error::invalid("some chunks are not written yet"))
198
0
        }
199
0
200
0
        // write all offset tables
201
0
        debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated");
202
0
        self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?;
203
204
0
        for table in self.chunk_indices_increasing_y {
205
0
            u64::write_slice(&mut self.byte_writer, table.as_slice())?;
206
        }
207
208
0
        self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning
209
0
        Ok(())
210
0
    }
Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::complete_meta_data
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::complete_meta_data
Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::complete_meta_data
211
212
}
213
214
215
impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) {
216
0
    fn total_chunks_count(&self) -> usize {
217
0
        self.chunk_writer.total_chunks_count()
218
0
    }
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<_, _> as exr::block::writer::ChunksWriter>::total_chunks_count
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::total_chunks_count
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::total_chunks_count
219
220
0
    fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult {
221
0
        let total_chunks = self.total_chunks_count();
222
0
        let on_progress = &mut self.on_progress;
223
0
224
0
        // guarantee on_progress being called with 0 once
225
0
        if self.written_chunks == 0 { on_progress(0.0); }
226
227
0
        self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?;
228
229
0
        self.written_chunks += 1;
230
0
231
0
        on_progress({
232
0
            // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0
233
0
            if self.written_chunks == total_chunks { 1.0 }
234
0
            else { self.written_chunks as f64 / total_chunks as f64 }
235
        });
236
237
0
        Ok(())
238
0
    }
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<_, _> as exr::block::writer::ChunksWriter>::write_chunk
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::write_chunk
Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::write_chunk
239
}
240
241
242
/// Write blocks that appear in any order and reorder them before writing.
243
#[derive(Debug)]
244
#[must_use]
245
pub struct SortedBlocksWriter<'w, W> {
246
    chunk_writer: &'w mut W,
247
    pending_chunks: BTreeMap<usize, (usize, Chunk)>,
248
    unwritten_chunk_indices: Peekable<std::ops::Range<usize>>,
249
    requires_sorting: bool, // using this instead of Option, because of borrowing
250
}
251
252
253
impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter {
254
255
    /// New sorting writer. Returns `None` if sorting is not required.
256
0
    pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> {
257
0
        let requires_sorting = meta_data.headers.iter()
258
0
            .any(|header| header.line_order != LineOrder::Unspecified);
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::new::{closure#0}
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}
259
0
260
0
        let total_chunk_count = chunk_writer.total_chunks_count();
261
0
262
0
        SortedBlocksWriter {
263
0
            pending_chunks: BTreeMap::new(),
264
0
            unwritten_chunk_indices: (0 .. total_chunk_count).peekable(),
265
0
            requires_sorting,
266
0
            chunk_writer
267
0
        }
268
0
    }
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::new
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new
269
270
    /// Write the chunk or stash it. In the closure, write all chunks that can be written now.
271
0
    pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult {
272
0
        if self.requires_sorting.not() {
273
0
            return self.chunk_writer.write_chunk(chunk_y_index, chunk);
274
0
        }
275
0
276
0
        // write this chunk now if possible
277
0
        if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){
278
0
            self.chunk_writer.write_chunk(chunk_y_index, chunk)?;
279
0
            self.unwritten_chunk_indices.next().expect("peeked chunk index is missing");
280
281
            // write all pending blocks that are immediate successors of this block
282
0
            while let Some((next_chunk_y_index, next_chunk)) = self
283
0
                .unwritten_chunk_indices.peek().cloned()
284
0
                .and_then(|id| self.pending_chunks.remove(&id))
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::write_or_stash_chunk::{closure#0}
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk::{closure#0}
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk::{closure#0}
285
            {
286
0
                self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?;
287
0
                self.unwritten_chunk_indices.next().expect("peeked chunk index is missing");
288
            }
289
        }
290
291
0
        else {
292
0
            // the argument block is not to be written now,
293
0
            // and all the pending blocks are not next up either,
294
0
            // so just stash this block
295
0
            self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk));
296
0
        }
297
298
0
        Ok(())
299
0
    }
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::write_or_stash_chunk
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk
300
301
    /// Where the chunks will be written to.
302
0
    pub fn inner_chunks_writer(&self) -> &W {
303
0
        &self.chunk_writer
304
0
    }
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::inner_chunks_writer
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer
Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer
305
}
306
307
308
309
/// Compress blocks to a chunk writer in this thread.
310
#[derive(Debug)]
311
#[must_use]
312
pub struct SequentialBlocksCompressor<'w, W> {
313
    meta: &'w MetaData,
314
    chunks_writer: &'w mut W,
315
}
316
317
impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter {
318
319
    /// New blocks writer.
320
0
    pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } }
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<_>>::new
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new
321
322
    /// This is where the compressed blocks are written to.
323
0
    pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer }
324
325
    /// Compress a single block immediately. The index of the block must be in increasing line order.
326
0
    pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult {
327
0
        self.chunks_writer.write_chunk(
328
0
            index_in_header_increasing_y,
329
0
            block.compress_to_chunk(&self.meta.headers)?
330
        )
331
0
    }
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<_>>::compress_block
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::compress_block
Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::compress_block
332
}
333
334
/// Compress blocks to a chunk writer with multiple threads.
335
#[derive(Debug)]
336
#[must_use]
337
pub struct ParallelBlocksCompressor<'w, W> {
338
    meta: &'w MetaData,
339
    sorted_writer: SortedBlocksWriter<'w, W>,
340
341
    sender: mpsc::Sender<Result<(usize, usize, Chunk)>>,
342
    receiver: mpsc::Receiver<Result<(usize, usize, Chunk)>>,
343
    pool: rayon_core::ThreadPool,
344
345
    currently_compressing_count: usize,
346
    written_chunk_count: usize, // used to check for last chunk
347
    max_threads: usize,
348
    next_incoming_chunk_index: usize, // used to remember original chunk order
349
}
350
351
impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter {
352
353
    /// New blocks writer. Returns none if sequential compression should be used.
354
    /// Use `new_with_thread_pool` to customize the threadpool.
355
0
    pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> {
356
0
        Self::new_with_thread_pool(meta, chunks_writer, ||{
357
0
            rayon_core::ThreadPoolBuilder::new()
358
0
                .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index))
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new::{closure#0}::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}::{closure#0}
359
0
                .build()
360
0
        })
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}
361
0
    }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new
362
363
    /// New blocks writer. Returns none if sequential compression should be used.
364
0
    pub fn new_with_thread_pool<CreatePool>(
365
0
        meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool)
366
0
        -> Option<Self>
367
0
        where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
368
0
    {
369
0
        if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) {
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new_with_thread_pool::<_>::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>::{closure#0}
370
0
            return None;
371
0
        }
372
373
        // in case thread pool creation fails (for example on WASM currently),
374
        // we revert to sequential compression
375
0
        let pool = match try_create_thread_pool() {
376
0
            Ok(pool) => pool,
377
378
            // TODO print warning?
379
0
            Err(_) => return None,
380
        };
381
382
0
        let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times
383
0
        let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic?
384
0
385
0
        Some(Self {
386
0
            sorted_writer: SortedBlocksWriter::new(meta, chunks_writer),
387
0
            next_incoming_chunk_index: 0,
388
0
            currently_compressing_count: 0,
389
0
            written_chunk_count: 0,
390
0
            sender: send,
391
0
            receiver: recv,
392
0
            max_threads,
393
0
            pool,
394
0
            meta,
395
0
        })
396
0
    }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new_with_thread_pool::<_>
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>
397
398
    /// This is where the compressed blocks are written to.
399
0
    pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::inner_chunks_writer
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer
400
401
    // private, as may underflow counter in release mode
402
0
    fn write_next_queued_chunk(&mut self) -> UnitResult {
403
0
        debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left");
404
405
0
        let some_compressed_chunk = self.receiver.recv()
406
0
            .expect("cannot receive compressed block");
407
0
408
0
        self.currently_compressing_count -= 1;
409
0
        let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?;
410
0
        self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?;
411
412
0
        self.written_chunk_count += 1;
413
0
        Ok(())
414
0
    }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::write_next_queued_chunk
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_next_queued_chunk
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_next_queued_chunk
415
416
    /// Wait until all currently compressing chunks in the compressor have been written.
417
0
    pub fn write_all_queued_chunks(&mut self) -> UnitResult {
418
0
        while self.currently_compressing_count > 0 {
419
0
            self.write_next_queued_chunk()?;
420
        }
421
422
0
        debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count");
423
0
        Ok(())
424
0
    }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::write_all_queued_chunks
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_all_queued_chunks
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_all_queued_chunks
425
426
    /// Add a single block to the compressor queue. The index of the block must be in increasing line order.
427
    /// When calling this function for the last block, this method waits until all the blocks have been written.
428
    /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`.
429
    /// Waits for a block from the queue to be written, if the queue already has enough items.
430
0
    pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult {
431
0
432
0
        // if pipe is full, block to wait for a slot to free up
433
0
        if self.currently_compressing_count >= self.max_threads {
434
0
            self.write_next_queued_chunk()?;
435
0
        }
436
437
        // add the argument chunk to the compression queueue
438
0
        let index_in_file = self.next_incoming_chunk_index;
439
0
        let sender = self.sender.clone();
440
0
        let meta = self.meta.clone();
441
0
442
0
        self.pool.spawn(move ||{
443
0
            let compressed_or_err = block.compress_to_chunk(&meta.headers);
444
0
445
0
            // by now, decompressing could have failed in another thread.
446
0
            // the error is then already handled, so we simply
447
0
            // don't send the decompressed block and do nothing
448
0
            let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed)));
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue::{closure#0}::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}::{closure#0}
449
0
        });
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}
450
0
451
0
        self.currently_compressing_count += 1;
452
0
        self.next_incoming_chunk_index += 1;
453
0
454
0
        // if this is the last chunk, wait for all chunks to complete before returning
455
0
        if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() {
456
0
            self.write_all_queued_chunks()?;
457
0
            debug_assert_eq!(
458
0
                self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(),
459
0
                "written chunk count mismatch"
460
            );
461
0
        }
462
463
464
0
        Ok(())
465
0
    }
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue
Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue
466
}
467
468
469