/rust/registry/src/index.crates.io-6f17d22bba15001f/exr-1.73.0/src/block/writer.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Composable structures to handle writing an image. |
2 | | |
3 | | |
4 | | use std::fmt::Debug; |
5 | | use std::io::Seek; |
6 | | use std::iter::Peekable; |
7 | | use std::ops::Not; |
8 | | use std::sync::mpsc; |
9 | | use rayon_core::{ThreadPool, ThreadPoolBuildError}; |
10 | | |
11 | | use smallvec::alloc::collections::BTreeMap; |
12 | | |
13 | | use crate::block::UncompressedBlock; |
14 | | use crate::block::chunk::Chunk; |
15 | | use crate::compression::Compression; |
16 | | use crate::error::{Error, Result, UnitResult, usize_to_u64}; |
17 | | use crate::io::{Data, Tracking, Write}; |
18 | | use crate::meta::{Headers, MetaData, OffsetTables}; |
19 | | use crate::meta::attribute::LineOrder; |
20 | | |
21 | | /// Write an exr file by writing one chunk after another in a closure. |
22 | | /// In the closure, you are provided a chunk writer, which should be used to write all the chunks. |
23 | | /// Assumes the your write destination is buffered. |
24 | 0 | pub fn write_chunks_with<W: Write + Seek>( |
25 | 0 | buffered_write: W, headers: Headers, pedantic: bool, |
26 | 0 | write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult |
27 | 0 | ) -> UnitResult { |
28 | | // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed |
29 | 0 | let (meta, mut writer) = ChunkWriter::new_for_buffered(buffered_write, headers, pedantic)?; |
30 | 0 | write_chunks(meta, &mut writer)?; |
31 | 0 | writer.complete_meta_data() |
32 | 0 | } Unexecuted instantiation: exr::block::writer::write_chunks_with::<_, _> Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}> Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}> Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}> Unexecuted instantiation: exr::block::writer::write_chunks_with::<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>, <exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}> |
33 | | |
34 | | /// Can consume compressed pixel chunks, writing them a file. |
35 | | /// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data, |
36 | | /// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`. |
37 | | /// Use `on_progress` to obtain a new writer |
38 | | /// that triggers a callback for each block. |
39 | | // #[must_use] |
40 | | #[derive(Debug)] |
41 | | #[must_use] |
42 | | pub struct ChunkWriter<W> { |
43 | | header_count: usize, |
44 | | byte_writer: Tracking<W>, |
45 | | chunk_indices_byte_location: std::ops::Range<usize>, |
46 | | chunk_indices_increasing_y: OffsetTables, |
47 | | chunk_count: usize, // TODO compose? |
48 | | } |
49 | | |
50 | | /// A new writer that triggers a callback |
51 | | /// for each block written to the inner writer. |
52 | | #[derive(Debug)] |
53 | | #[must_use] |
54 | | pub struct OnProgressChunkWriter<'w, W, F> { |
55 | | chunk_writer: &'w mut W, |
56 | | written_chunks: usize, |
57 | | on_progress: F, |
58 | | } |
59 | | |
60 | | /// Write chunks to a byte destination. |
61 | | /// Then write each chunk with `writer.write_chunk(chunk)`. |
62 | | pub trait ChunksWriter: Sized { |
63 | | |
64 | | /// The total number of chunks that the complete file will contain. |
65 | | fn total_chunks_count(&self) -> usize; |
66 | | |
67 | | /// Any more calls will result in an error and have no effect. |
68 | | /// If writing results in an error, the file and the writer |
69 | | /// may remain in an invalid state and should not be used further. |
70 | | /// Errors when the chunk at this index was already written. |
71 | | fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult; |
72 | | |
73 | | /// Obtain a new writer that calls the specified closure for each block that is written to this writer. |
74 | 0 | fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) { |
75 | 0 | OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress } |
76 | 0 | } Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::on_progress::<_> Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::on_progress::<fn(f64)> Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::on_progress::<fn(f64)> |
77 | | |
78 | | /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer. |
79 | 0 | fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> { |
80 | 0 | SequentialBlocksCompressor::new(meta, self) |
81 | 0 | } Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::sequential_blocks_compressor Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::sequential_blocks_compressor Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::sequential_blocks_compressor |
82 | | |
83 | | /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer. |
84 | | /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead). |
85 | 0 | fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> { |
86 | 0 | ParallelBlocksCompressor::new(meta, self) |
87 | 0 | } Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::parallel_blocks_compressor Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::parallel_blocks_compressor Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::parallel_blocks_compressor |
88 | | |
89 | | /// Compresses all blocks to the file. |
90 | | /// The index of the block must be in increasing line order within the header. |
91 | | /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. |
92 | 0 | fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { |
93 | 0 | let mut writer = self.sequential_blocks_compressor(meta); |
94 | | |
95 | | // TODO check block order if line order is not unspecified! |
96 | 0 | for (index_in_header_increasing_y, block) in blocks { |
97 | 0 | writer.compress_block(index_in_header_increasing_y, block)?; |
98 | | } |
99 | | |
100 | | // TODO debug_assert_eq!(self.is_complete()); |
101 | 0 | Ok(()) |
102 | 0 | } Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<_> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_sequential::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> |
103 | | |
104 | | /// Compresses all blocks to the file. |
105 | | /// The index of the block must be in increasing line order within the header. |
106 | | /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods. |
107 | | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. |
108 | 0 | fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult { |
109 | 0 | let mut parallel_writer = match self.parallel_blocks_compressor(meta) { |
110 | 0 | None => return self.compress_all_blocks_sequential(meta, blocks), |
111 | 0 | Some(writer) => writer, |
112 | | }; |
113 | | |
114 | | // TODO check block order if line order is not unspecified! |
115 | 0 | for (index_in_header_increasing_y, block) in blocks { |
116 | 0 | parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?; |
117 | | } |
118 | | |
119 | | // TODO debug_assert_eq!(self.is_complete()); |
120 | 0 | Ok(()) |
121 | 0 | } Unexecuted instantiation: <_ as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<_> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<&mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::compress_all_blocks_parallel::<core::iter::adapters::map::Map<core::iter::adapters::flatten::FlatMap<core::iter::adapters::enumerate::Enumerate<core::slice::iter::Iter<exr::meta::header::Header>>, core::iter::adapters::map::Map<alloc::boxed::Box<dyn core::iter::traits::iterator::Iterator<Item = (usize, exr::meta::TileIndices)> + core::marker::Send>, exr::block::enumerate_ordered_header_block_indices::{closure#0}::{closure#0}>, exr::block::enumerate_ordered_header_block_indices::{closure#0}>, <exr::meta::MetaData>::collect_ordered_blocks<<exr::meta::MetaData>::collect_ordered_block_data<<exr::image::write::WriteImageWithOptions<exr::image::Layer<exr::image::SpecificChannels<image::codecs::openexr::write_buffer<std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#1}, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription)>>, fn(f64)>>::to_buffered<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>::{closure#0}::{closure#0}>::{closure#0}>::{closure#0}>> |
122 | | } |
123 | | |
124 | | |
125 | | impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek { |
126 | | |
127 | | /// The total number of chunks that the complete file will contain. |
128 | 0 | fn total_chunks_count(&self) -> usize { self.chunk_count } Unexecuted instantiation: <exr::block::writer::ChunkWriter<_> as exr::block::writer::ChunksWriter>::total_chunks_count Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::total_chunks_count Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::total_chunks_count |
129 | | |
130 | | /// Any more calls will result in an error and have no effect. |
131 | | /// If writing results in an error, the file and the writer |
132 | | /// may remain in an invalid state and should not be used further. |
133 | | /// Errors when the chunk at this index was already written. |
134 | 0 | fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { |
135 | 0 | let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index]; |
136 | 0 |
|
137 | 0 | if index_in_header_increasing_y >= header_chunk_indices.len() { |
138 | 0 | return Err(Error::invalid("too large chunk index")); |
139 | 0 | } |
140 | 0 |
|
141 | 0 | let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y]; |
142 | 0 | if *chunk_index_slot != 0 { |
143 | 0 | return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y))); |
144 | 0 | } |
145 | 0 |
|
146 | 0 | *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position()); |
147 | 0 | chunk.write(&mut self.byte_writer, self.header_count)?; |
148 | 0 | Ok(()) |
149 | 0 | } Unexecuted instantiation: <exr::block::writer::ChunkWriter<_> as exr::block::writer::ChunksWriter>::write_chunk Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::write_chunk Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>> as exr::block::writer::ChunksWriter>::write_chunk |
150 | | } |
151 | | |
152 | | impl<W> ChunkWriter<W> where W: Write + Seek { |
153 | | // -- the following functions are private, because they must be called in a strict order -- |
154 | | |
155 | | /// Writes the meta data and zeroed offset tables as a placeholder. |
156 | 0 | fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> { |
157 | 0 | let mut write = Tracking::new(buffered_byte_writer); |
158 | 0 | let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?; |
159 | | |
160 | | // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now |
161 | | /*// if non-parallel compression, we always use increasing order anyways |
162 | | if !parallel || !has_compression { |
163 | | for header in &mut headers { |
164 | | if header.line_order == LineOrder::Unspecified { |
165 | | header.line_order = LineOrder::Increasing; |
166 | | } |
167 | | } |
168 | | }*/ |
169 | | |
170 | 0 | let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum(); Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered::{closure#0} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#0} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#0} |
171 | 0 |
|
172 | 0 | let offset_table_start_byte = write.byte_position(); |
173 | 0 | let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE; |
174 | 0 |
|
175 | 0 | // skip offset tables, filling with 0, will be updated after the last chunk has been written |
176 | 0 | write.seek_write_to(offset_table_end_byte)?; |
177 | | |
178 | 0 | let header_count = headers.len(); |
179 | 0 | let chunk_indices_increasing_y = headers.iter() |
180 | 0 | .map(|header| vec![0_u64; header.chunk_count]).collect(); Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered::{closure#1} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#1} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered::{closure#1} |
181 | 0 |
|
182 | 0 | let meta_data = MetaData { requirements, headers }; |
183 | 0 |
|
184 | 0 | Ok((meta_data, ChunkWriter { |
185 | 0 | header_count, |
186 | 0 | byte_writer: write, |
187 | 0 | chunk_count: offset_table_size, |
188 | 0 | chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte, |
189 | 0 | chunk_indices_increasing_y, |
190 | 0 | })) |
191 | 0 | } Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::new_for_buffered Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::new_for_buffered Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::new_for_buffered |
192 | | |
193 | | /// Seek back to the meta data, write offset tables, and flush the byte writer. |
194 | | /// Leaves the writer seeked to the middle of the file. |
195 | 0 | fn complete_meta_data(mut self) -> UnitResult { |
196 | 0 | if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) { Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::complete_meta_data::{closure#0} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::complete_meta_data::{closure#0} Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::complete_meta_data::{closure#0} |
197 | 0 | return Err(Error::invalid("some chunks are not written yet")) |
198 | 0 | } |
199 | 0 |
|
200 | 0 | // write all offset tables |
201 | 0 | debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated"); |
202 | 0 | self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?; |
203 | | |
204 | 0 | for table in self.chunk_indices_increasing_y { |
205 | 0 | u64::write_slice(&mut self.byte_writer, table.as_slice())?; |
206 | | } |
207 | | |
208 | 0 | self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning |
209 | 0 | Ok(()) |
210 | 0 | } Unexecuted instantiation: <exr::block::writer::ChunkWriter<_>>::complete_meta_data Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::complete_meta_data Unexecuted instantiation: <exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>>::complete_meta_data |
211 | | |
212 | | } |
213 | | |
214 | | |
215 | | impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) { |
216 | 0 | fn total_chunks_count(&self) -> usize { |
217 | 0 | self.chunk_writer.total_chunks_count() |
218 | 0 | } Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<_, _> as exr::block::writer::ChunksWriter>::total_chunks_count Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::total_chunks_count Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::total_chunks_count |
219 | | |
220 | 0 | fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult { |
221 | 0 | let total_chunks = self.total_chunks_count(); |
222 | 0 | let on_progress = &mut self.on_progress; |
223 | 0 |
|
224 | 0 | // guarantee on_progress being called with 0 once |
225 | 0 | if self.written_chunks == 0 { on_progress(0.0); } |
226 | | |
227 | 0 | self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?; |
228 | | |
229 | 0 | self.written_chunks += 1; |
230 | 0 |
|
231 | 0 | on_progress({ |
232 | 0 | // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0 |
233 | 0 | if self.written_chunks == total_chunks { 1.0 } |
234 | 0 | else { self.written_chunks as f64 / total_chunks as f64 } |
235 | | }); |
236 | | |
237 | 0 | Ok(()) |
238 | 0 | } Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<_, _> as exr::block::writer::ChunksWriter>::write_chunk Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::write_chunk Unexecuted instantiation: <exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)> as exr::block::writer::ChunksWriter>::write_chunk |
239 | | } |
240 | | |
241 | | |
242 | | /// Write blocks that appear in any order and reorder them before writing. |
243 | | #[derive(Debug)] |
244 | | #[must_use] |
245 | | pub struct SortedBlocksWriter<'w, W> { |
246 | | chunk_writer: &'w mut W, |
247 | | pending_chunks: BTreeMap<usize, (usize, Chunk)>, |
248 | | unwritten_chunk_indices: Peekable<std::ops::Range<usize>>, |
249 | | requires_sorting: bool, // using this instead of Option, because of borrowing |
250 | | } |
251 | | |
252 | | |
253 | | impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter { |
254 | | |
255 | | /// New sorting writer. Returns `None` if sorting is not required. |
256 | 0 | pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> { |
257 | 0 | let requires_sorting = meta_data.headers.iter() |
258 | 0 | .any(|header| header.line_order != LineOrder::Unspecified); Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::new::{closure#0} Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0} Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0} |
259 | 0 |
|
260 | 0 | let total_chunk_count = chunk_writer.total_chunks_count(); |
261 | 0 |
|
262 | 0 | SortedBlocksWriter { |
263 | 0 | pending_chunks: BTreeMap::new(), |
264 | 0 | unwritten_chunk_indices: (0 .. total_chunk_count).peekable(), |
265 | 0 | requires_sorting, |
266 | 0 | chunk_writer |
267 | 0 | } |
268 | 0 | } Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::new Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new |
269 | | |
270 | | /// Write the chunk or stash it. In the closure, write all chunks that can be written now. |
271 | 0 | pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult { |
272 | 0 | if self.requires_sorting.not() { |
273 | 0 | return self.chunk_writer.write_chunk(chunk_y_index, chunk); |
274 | 0 | } |
275 | 0 |
|
276 | 0 | // write this chunk now if possible |
277 | 0 | if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){ |
278 | 0 | self.chunk_writer.write_chunk(chunk_y_index, chunk)?; |
279 | 0 | self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); |
280 | | |
281 | | // write all pending blocks that are immediate successors of this block |
282 | 0 | while let Some((next_chunk_y_index, next_chunk)) = self |
283 | 0 | .unwritten_chunk_indices.peek().cloned() |
284 | 0 | .and_then(|id| self.pending_chunks.remove(&id)) Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::write_or_stash_chunk::{closure#0} Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk::{closure#0} Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk::{closure#0} |
285 | | { |
286 | 0 | self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?; |
287 | 0 | self.unwritten_chunk_indices.next().expect("peeked chunk index is missing"); |
288 | | } |
289 | | } |
290 | | |
291 | 0 | else { |
292 | 0 | // the argument block is not to be written now, |
293 | 0 | // and all the pending blocks are not next up either, |
294 | 0 | // so just stash this block |
295 | 0 | self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk)); |
296 | 0 | } |
297 | | |
298 | 0 | Ok(()) |
299 | 0 | } Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::write_or_stash_chunk Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_or_stash_chunk |
300 | | |
301 | | /// Where the chunks will be written to. |
302 | 0 | pub fn inner_chunks_writer(&self) -> &W { |
303 | 0 | &self.chunk_writer |
304 | 0 | } Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<_>>::inner_chunks_writer Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer Unexecuted instantiation: <exr::block::writer::SortedBlocksWriter<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer |
305 | | } |
306 | | |
307 | | |
308 | | |
309 | | /// Compress blocks to a chunk writer in this thread. |
310 | | #[derive(Debug)] |
311 | | #[must_use] |
312 | | pub struct SequentialBlocksCompressor<'w, W> { |
313 | | meta: &'w MetaData, |
314 | | chunks_writer: &'w mut W, |
315 | | } |
316 | | |
317 | | impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter { |
318 | | |
319 | | /// New blocks writer. |
320 | 0 | pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } } Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<_>>::new Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new |
321 | | |
322 | | /// This is where the compressed blocks are written to. |
323 | 0 | pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer } |
324 | | |
325 | | /// Compress a single block immediately. The index of the block must be in increasing line order. |
326 | 0 | pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { |
327 | 0 | self.chunks_writer.write_chunk( |
328 | 0 | index_in_header_increasing_y, |
329 | 0 | block.compress_to_chunk(&self.meta.headers)? |
330 | | ) |
331 | 0 | } Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<_>>::compress_block Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::compress_block Unexecuted instantiation: <exr::block::writer::SequentialBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::compress_block |
332 | | } |
333 | | |
334 | | /// Compress blocks to a chunk writer with multiple threads. |
335 | | #[derive(Debug)] |
336 | | #[must_use] |
337 | | pub struct ParallelBlocksCompressor<'w, W> { |
338 | | meta: &'w MetaData, |
339 | | sorted_writer: SortedBlocksWriter<'w, W>, |
340 | | |
341 | | sender: mpsc::Sender<Result<(usize, usize, Chunk)>>, |
342 | | receiver: mpsc::Receiver<Result<(usize, usize, Chunk)>>, |
343 | | pool: rayon_core::ThreadPool, |
344 | | |
345 | | currently_compressing_count: usize, |
346 | | written_chunk_count: usize, // used to check for last chunk |
347 | | max_threads: usize, |
348 | | next_incoming_chunk_index: usize, // used to remember original chunk order |
349 | | } |
350 | | |
351 | | impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter { |
352 | | |
353 | | /// New blocks writer. Returns none if sequential compression should be used. |
354 | | /// Use `new_with_thread_pool` to customize the threadpool. |
355 | 0 | pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> { |
356 | 0 | Self::new_with_thread_pool(meta, chunks_writer, ||{ |
357 | 0 | rayon_core::ThreadPoolBuilder::new() |
358 | 0 | .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index)) Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new::{closure#0}::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}::{closure#0} |
359 | 0 | .build() |
360 | 0 | }) Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0} |
361 | 0 | } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new |
362 | | |
363 | | /// New blocks writer. Returns none if sequential compression should be used. |
364 | 0 | pub fn new_with_thread_pool<CreatePool>( |
365 | 0 | meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool) |
366 | 0 | -> Option<Self> |
367 | 0 | where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError> |
368 | 0 | { |
369 | 0 | if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) { Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new_with_thread_pool::<_>::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}>::{closure#0} |
370 | 0 | return None; |
371 | 0 | } |
372 | | |
373 | | // in case thread pool creation fails (for example on WASM currently), |
374 | | // we revert to sequential compression |
375 | 0 | let pool = match try_create_thread_pool() { |
376 | 0 | Ok(pool) => pool, |
377 | | |
378 | | // TODO print warning? |
379 | 0 | Err(_) => return None, |
380 | | }; |
381 | | |
382 | 0 | let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times |
383 | 0 | let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic? |
384 | 0 |
|
385 | 0 | Some(Self { |
386 | 0 | sorted_writer: SortedBlocksWriter::new(meta, chunks_writer), |
387 | 0 | next_incoming_chunk_index: 0, |
388 | 0 | currently_compressing_count: 0, |
389 | 0 | written_chunk_count: 0, |
390 | 0 | sender: send, |
391 | 0 | receiver: recv, |
392 | 0 | max_threads, |
393 | 0 | pool, |
394 | 0 | meta, |
395 | 0 | }) |
396 | 0 | } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::new_with_thread_pool::<_> Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}> Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new_with_thread_pool::<<exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::new::{closure#0}> |
397 | | |
398 | | /// This is where the compressed blocks are written to. |
399 | 0 | pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::inner_chunks_writer Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::inner_chunks_writer |
400 | | |
401 | | // private, as may underflow counter in release mode |
402 | 0 | fn write_next_queued_chunk(&mut self) -> UnitResult { |
403 | 0 | debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left"); |
404 | | |
405 | 0 | let some_compressed_chunk = self.receiver.recv() |
406 | 0 | .expect("cannot receive compressed block"); |
407 | 0 |
|
408 | 0 | self.currently_compressing_count -= 1; |
409 | 0 | let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?; |
410 | 0 | self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?; |
411 | | |
412 | 0 | self.written_chunk_count += 1; |
413 | 0 | Ok(()) |
414 | 0 | } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::write_next_queued_chunk Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_next_queued_chunk Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_next_queued_chunk |
415 | | |
416 | | /// Wait until all currently compressing chunks in the compressor have been written. |
417 | 0 | pub fn write_all_queued_chunks(&mut self) -> UnitResult { |
418 | 0 | while self.currently_compressing_count > 0 { |
419 | 0 | self.write_next_queued_chunk()?; |
420 | | } |
421 | | |
422 | 0 | debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count"); |
423 | 0 | Ok(()) |
424 | 0 | } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::write_all_queued_chunks Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::write_all_queued_chunks Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::write_all_queued_chunks |
425 | | |
426 | | /// Add a single block to the compressor queue. The index of the block must be in increasing line order. |
427 | | /// When calling this function for the last block, this method waits until all the blocks have been written. |
428 | | /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`. |
429 | | /// Waits for a block from the queue to be written, if the queue already has enough items. |
430 | 0 | pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult { |
431 | 0 |
|
432 | 0 | // if pipe is full, block to wait for a slot to free up |
433 | 0 | if self.currently_compressing_count >= self.max_threads { |
434 | 0 | self.write_next_queued_chunk()?; |
435 | 0 | } |
436 | | |
437 | | // add the argument chunk to the compression queueue |
438 | 0 | let index_in_file = self.next_incoming_chunk_index; |
439 | 0 | let sender = self.sender.clone(); |
440 | 0 | let meta = self.meta.clone(); |
441 | 0 |
|
442 | 0 | self.pool.spawn(move ||{ |
443 | 0 | let compressed_or_err = block.compress_to_chunk(&meta.headers); |
444 | 0 |
|
445 | 0 | // by now, decompressing could have failed in another thread. |
446 | 0 | // the error is then already handled, so we simply |
447 | 0 | // don't send the decompressed block and do nothing |
448 | 0 | let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed))); Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue::{closure#0}::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0}::{closure#0} |
449 | 0 | }); Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0} Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue::{closure#0} |
450 | 0 |
|
451 | 0 | self.currently_compressing_count += 1; |
452 | 0 | self.next_incoming_chunk_index += 1; |
453 | 0 |
|
454 | 0 | // if this is the last chunk, wait for all chunks to complete before returning |
455 | 0 | if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() { |
456 | 0 | self.write_all_queued_chunks()?; |
457 | 0 | debug_assert_eq!( |
458 | 0 | self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(), |
459 | 0 | "written chunk count mismatch" |
460 | | ); |
461 | 0 | } |
462 | | |
463 | | |
464 | 0 | Ok(()) |
465 | 0 | } Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<_>>::add_block_to_compression_queue Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut &mut std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue Unexecuted instantiation: <exr::block::writer::ParallelBlocksCompressor<exr::block::writer::OnProgressChunkWriter<exr::block::writer::ChunkWriter<&mut std::io::cursor::Cursor<&mut alloc::vec::Vec<u8>>>, fn(f64)>>>::add_block_to_compression_queue |
466 | | } |
467 | | |
468 | | |
469 | | |