/rust/registry/src/index.crates.io-1949cf8c6b5b557f/exr-1.74.0/src/block/reader.rs
Line | Count | Source |
1 | | //! Composable structures to handle reading an image. |
2 | | |
3 | | |
4 | | use std::convert::TryFrom; |
5 | | use std::fmt::Debug; |
6 | | use std::io::{Read, Seek}; |
7 | | use crate::block::{BlockIndex, UncompressedBlock}; |
8 | | use crate::block::chunk::{Chunk, TileCoordinates}; |
9 | | use crate::error::{Error, Result, u64_to_usize, UnitResult}; |
10 | | use crate::io::{PeekRead, Tracking}; |
11 | | use crate::meta::{MetaData, OffsetTables}; |
12 | | use crate::meta::header::Header; |
13 | | |
14 | | /// Decode the meta data from a byte source, keeping the source ready for further reading. |
15 | | /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. |
16 | | #[derive(Debug)] |
17 | | pub struct Reader<R> { |
18 | | meta_data: MetaData, |
19 | | remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? |
20 | | } |
21 | | |
22 | | impl<R: Read + Seek> Reader<R> { |
23 | | |
24 | | /// Start the reading process. |
25 | | /// Immediately decodes the meta data into an internal field. |
26 | | /// Access it via`meta_data()`. |
27 | 0 | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { |
28 | 0 | let mut remaining_reader = PeekRead::new(Tracking::new(read)); |
29 | 0 | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; |
30 | 0 | Ok(Self { meta_data, remaining_reader }) |
31 | 0 | } Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::read_from_buffered Unexecuted instantiation: <exr::block::reader::Reader<_>>::read_from_buffered Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::read_from_buffered |
32 | | |
33 | | // must not be mutable, as reading the file later on relies on the meta data |
34 | | /// The decoded exr meta data from the file. |
35 | 0 | pub fn meta_data(&self) -> &MetaData { &self.meta_data }Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::meta_data Unexecuted instantiation: <exr::block::reader::Reader<_>>::meta_data Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::meta_data |
36 | | |
37 | | /// The decoded exr meta data from the file. |
38 | 0 | pub fn headers(&self) -> &[Header] { &self.meta_data.headers }Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::headers Unexecuted instantiation: <exr::block::reader::Reader<_>>::headers Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::headers |
39 | | |
40 | | /// Obtain the meta data ownership. |
41 | 0 | pub fn into_meta_data(self) -> MetaData { self.meta_data } |
42 | | |
43 | | /// Prepare to read all the chunks from the file. |
44 | | /// Does not decode the chunks now, but returns a decoder. |
45 | | /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. |
46 | 0 | pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { |
47 | 0 | let total_chunk_count = { |
48 | 0 | if pedantic { |
49 | 0 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; |
50 | 0 | validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; |
51 | 0 | offset_tables.iter().map(|table| table.len()).sum() |
52 | | } |
53 | | else { |
54 | 0 | MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)? |
55 | | } |
56 | | }; |
57 | | |
58 | 0 | Ok(AllChunksReader { |
59 | 0 | meta_data: self.meta_data, |
60 | 0 | remaining_chunks: 0 .. total_chunk_count, |
61 | 0 | remaining_bytes: self.remaining_reader, |
62 | 0 | pedantic |
63 | 0 | }) |
64 | 0 | } |
65 | | |
66 | | /// Prepare to read some the chunks from the file. |
67 | | /// Does not decode the chunks now, but returns a decoder. |
68 | | /// Reading only some chunks may seeking the file, potentially skipping many bytes. |
69 | | // TODO tile indices add no new information to block index?? |
70 | 0 | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { |
71 | 0 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; |
72 | | |
73 | | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? |
74 | 0 | if pedantic { |
75 | 0 | validate_offset_tables( |
76 | 0 | self.meta_data.headers.as_slice(), &offset_tables, |
77 | 0 | self.remaining_reader.byte_position() |
78 | 0 | )?; |
79 | 0 | } |
80 | | |
81 | 0 | let mut filtered_offsets = Vec::with_capacity( |
82 | 0 | (self.meta_data.headers.len() * 32).min(2*2048) |
83 | | ); |
84 | | |
85 | | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied |
86 | | |
87 | 0 | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers |
88 | 0 | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order |
89 | 0 | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; |
90 | | |
91 | 0 | let block = BlockIndex { |
92 | 0 | layer: header_index, |
93 | 0 | level: tile.location.level_index, |
94 | 0 | pixel_position: data_indices.position.to_usize("data indices start")?, |
95 | 0 | pixel_size: data_indices.size, |
96 | | }; |
97 | | |
98 | 0 | if filter(&self.meta_data, tile.location, block) { |
99 | 0 | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` |
100 | 0 | } |
101 | | }; |
102 | | } |
103 | | |
104 | 0 | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) |
105 | | |
106 | 0 | if pedantic { |
107 | | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. |
108 | 0 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>::{closure#0}Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_>::{closure#0}Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>::{closure#0} |
109 | 0 | return Err(Error::invalid("chunk offset table")) |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | 0 | Ok(FilteredChunksReader { |
114 | 0 | meta_data: self.meta_data, |
115 | 0 | expected_filtered_chunk_count: filtered_offsets.len(), |
116 | 0 | remaining_filtered_chunk_indices: filtered_offsets.into_iter(), |
117 | 0 | remaining_bytes: self.remaining_reader |
118 | 0 | }) |
119 | 0 | } Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_> Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}> |
120 | | } |
121 | | |
122 | | |
123 | 0 | fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { |
124 | 0 | let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max |
125 | 0 | .map(|header| header.max_pixel_file_bytes()) |
126 | 0 | .sum(); |
127 | | |
128 | | // check that each offset is within the bounds |
129 | 0 | let end_byte = chunks_start_byte + max_pixel_bytes; |
130 | 0 | let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64, "chunk start")) |
131 | 0 | .any(|maybe_chunk_start| match maybe_chunk_start { |
132 | 0 | Ok(chunk_start) => chunk_start < chunks_start_byte || chunk_start > end_byte, |
133 | 0 | Err(_) => true |
134 | 0 | }); |
135 | | |
136 | 0 | if is_invalid { Err(Error::invalid("offset table")) } |
137 | 0 | else { Ok(()) } |
138 | 0 | } |
139 | | |
140 | | |
141 | | |
142 | | |
143 | | /// Decode the desired chunks and skip the unimportant chunks in the file. |
144 | | /// The decoded chunks can be decompressed by calling |
145 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. |
146 | | /// Call `on_progress` to have a callback with each block. |
147 | | /// Also contains the image meta data. |
148 | | #[derive(Debug)] |
149 | | pub struct FilteredChunksReader<R> { |
150 | | meta_data: MetaData, |
151 | | expected_filtered_chunk_count: usize, |
152 | | remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, |
153 | | remaining_bytes: PeekRead<Tracking<R>>, |
154 | | } |
155 | | |
156 | | /// Decode all chunks in the file without seeking. |
157 | | /// The decoded chunks can be decompressed by calling |
158 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. |
159 | | /// Call `on_progress` to have a callback with each block. |
160 | | /// Also contains the image meta data. |
161 | | #[derive(Debug)] |
162 | | pub struct AllChunksReader<R> { |
163 | | meta_data: MetaData, |
164 | | remaining_chunks: std::ops::Range<usize>, |
165 | | remaining_bytes: PeekRead<Tracking<R>>, |
166 | | pedantic: bool, |
167 | | } |
168 | | |
169 | | /// Decode chunks in the file without seeking. |
170 | | /// Calls the supplied closure for each chunk. |
171 | | /// The decoded chunks can be decompressed by calling |
172 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. |
173 | | /// Also contains the image meta data. |
174 | | #[derive(Debug)] |
175 | | pub struct OnProgressChunksReader<R, F> { |
176 | | chunks_reader: R, |
177 | | decoded_chunks: usize, |
178 | | callback: F, |
179 | | } |
180 | | |
181 | | /// Decode chunks in the file. |
182 | | /// The decoded chunks can be decompressed by calling |
183 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. |
184 | | /// Call `on_progress` to have a callback with each block. |
185 | | /// Also contains the image meta data. |
186 | | pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { |
187 | | |
188 | | /// The decoded exr meta data from the file. |
189 | | fn meta_data(&self) -> &MetaData; |
190 | | |
191 | | /// The decoded exr headers from the file. |
192 | 0 | fn headers(&self) -> &[Header] { &self.meta_data().headers } |
193 | | |
194 | | /// The number of chunks that this reader will return in total. |
195 | | /// Can be less than the total number of chunks in the file, if some chunks are skipped. |
196 | | fn expected_chunk_count(&self) -> usize; |
197 | | |
198 | | /// Read the next compressed chunk from the file. |
199 | | /// Equivalent to `.next()`, as this also is an iterator. |
200 | | /// Returns `None` if all chunks have been read. |
201 | 0 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::read_next_chunk Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk |
202 | | |
203 | | /// Create a new reader that calls the provided progress |
204 | | /// callback for each chunk that is read from the file. |
205 | | /// If the file can be successfully decoded, |
206 | | /// the progress will always at least once include 0.0 at the start and 1.0 at the end. |
207 | 0 | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { |
208 | 0 | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } |
209 | 0 | } Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)> Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::on_progress::<_> Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)> |
210 | | |
211 | | #[cfg(feature = "rayon")] |
212 | | /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. |
213 | | /// The order of the blocks is not deterministic. |
214 | | /// You can also use `parallel_decompressor` to obtain an iterator instead. |
215 | | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. |
216 | | // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) |
217 | 0 | fn decompress_parallel( |
218 | 0 | self, pedantic: bool, |
219 | 0 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult |
220 | 0 | ) -> UnitResult |
221 | | { |
222 | 0 | let mut decompressor = match self.parallel_decompressor(pedantic) { |
223 | 0 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), |
224 | 0 | Ok(decompressor) => decompressor, |
225 | | }; |
226 | | |
227 | 0 | while let Some(block) = decompressor.next() { |
228 | 0 | insert_block(decompressor.meta_data(), block?)?; |
229 | | } |
230 | | |
231 | 0 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); |
232 | 0 | Ok(()) |
233 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_parallel::<_> Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}> |
234 | | |
235 | | #[cfg(feature = "rayon")] |
236 | | /// Return an iterator that decompresses the chunks with multiple threads. |
237 | | /// The order of the blocks is not deterministic. |
238 | | /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. |
239 | | /// By default, this uses as many threads as there are CPUs. |
240 | | /// Returns the `self` if there is no need for parallel decompression. |
241 | 0 | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { |
242 | 0 | ParallelBlockDecompressor::new(self, pedantic) |
243 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::parallel_decompressor Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor |
244 | | |
245 | | /// Return an iterator that decompresses the chunks in this thread. |
246 | | /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. |
247 | 0 | fn decompress_sequential( |
248 | 0 | self, pedantic: bool, |
249 | 0 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult |
250 | 0 | ) -> UnitResult |
251 | | { |
252 | 0 | let mut decompressor = self.sequential_decompressor(pedantic); |
253 | 0 | while let Some(block) = decompressor.next() { |
254 | 0 | insert_block(decompressor.meta_data(), block?)?; |
255 | | } |
256 | | |
257 | 0 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); |
258 | 0 | Ok(()) |
259 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#2}>Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_sequential::<_> Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#2}>Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}> |
260 | | |
261 | | /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. |
262 | 0 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { |
263 | 0 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } |
264 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::sequential_decompressor Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor |
265 | | } |
266 | | |
267 | | impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { |
268 | 0 | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::meta_data Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data |
269 | 0 | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::expected_chunk_count Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count |
270 | | } |
271 | | |
272 | | impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} |
273 | | impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { |
274 | | type Item = Result<Chunk>; |
275 | | |
276 | 0 | fn next(&mut self) -> Option<Self::Item> { |
277 | 0 | self.chunks_reader.next().map(|item|{ |
278 | 0 | { |
279 | 0 | let total_chunks = self.expected_chunk_count() as f64; |
280 | 0 | let callback = &mut self.callback; |
281 | 0 | callback(self.decoded_chunks as f64 / total_chunks); |
282 | 0 | } |
283 | | |
284 | 0 | self.decoded_chunks += 1; |
285 | 0 | item |
286 | 0 | }) Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0}Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#0}Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0} |
287 | 0 | .or_else(||{ |
288 | 0 | debug_assert_eq!( |
289 | 0 | self.decoded_chunks, self.expected_chunk_count(), |
290 | 0 | "chunks reader finished but not all chunks are decompressed" |
291 | | ); |
292 | | |
293 | 0 | let callback = &mut self.callback; |
294 | 0 | callback(1.0); |
295 | 0 | None |
296 | 0 | }) Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1}Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#1}Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1} |
297 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next |
298 | | |
299 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
300 | 0 | self.chunks_reader.size_hint() |
301 | 0 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::size_hint Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint |
302 | | } |
303 | | |
304 | | impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { |
305 | 0 | fn meta_data(&self) -> &MetaData { &self.meta_data } |
306 | 0 | fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } |
307 | | } |
308 | | |
309 | | impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} |
310 | | impl<R: Read + Seek> Iterator for AllChunksReader<R> { |
311 | | type Item = Result<Chunk>; |
312 | | |
313 | 0 | fn next(&mut self) -> Option<Self::Item> { |
314 | | // read as many chunks as the file should contain (inferred from meta data) |
315 | 0 | let next_chunk = self.remaining_chunks.next() |
316 | 0 | .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); |
317 | | |
318 | | // if no chunks are left, but some bytes remain, return error |
319 | 0 | if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { |
320 | 0 | return Some(Err(Error::invalid("end of file expected"))); |
321 | 0 | } |
322 | | |
323 | 0 | next_chunk |
324 | 0 | } |
325 | | |
326 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
327 | 0 | (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) |
328 | 0 | } |
329 | | } |
330 | | |
331 | | impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { |
332 | 0 | fn meta_data(&self) -> &MetaData { &self.meta_data }Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::meta_data Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::meta_data Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::meta_data |
333 | 0 | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::expected_chunk_count Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::expected_chunk_count Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::expected_chunk_count |
334 | | } |
335 | | |
336 | | impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} |
337 | | impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { |
338 | | type Item = Result<Chunk>; |
339 | | |
340 | 0 | fn next(&mut self) -> Option<Self::Item> { |
341 | | // read as many chunks as we have desired chunk offsets |
342 | 0 | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ |
343 | 0 | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts |
344 | 0 | usize::try_from(next_chunk_location)? |
345 | 0 | )?; |
346 | | |
347 | 0 | let meta_data = &self.meta_data; |
348 | 0 | Chunk::read(&mut self.remaining_bytes, meta_data) |
349 | 0 | }) Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next::{closure#0}Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next::{closure#0}Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next::{closure#0} |
350 | | |
351 | | // TODO remember last chunk index and then seek to index+size and check whether bytes are left? |
352 | 0 | } Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next |
353 | | |
354 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
355 | 0 | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) |
356 | 0 | } Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::size_hint Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::size_hint Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::size_hint |
357 | | } |
358 | | |
359 | | /// Read all chunks from the file, decompressing each chunk immediately. |
360 | | /// Implements iterator. |
361 | | #[derive(Debug)] |
362 | | pub struct SequentialBlockDecompressor<R: ChunksReader> { |
363 | | remaining_chunks_reader: R, |
364 | | pedantic: bool, |
365 | | } |
366 | | |
367 | | impl<R: ChunksReader> SequentialBlockDecompressor<R> { |
368 | | |
369 | | /// The extracted meta data from the image file. |
370 | 0 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::meta_data Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data |
371 | | |
372 | | /// Read and then decompress a single block of pixels from the byte source. |
373 | 0 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { |
374 | 0 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ |
375 | 0 | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) |
376 | 0 | }) Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block::{closure#0}Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0} |
377 | 0 | } Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block |
378 | | } |
379 | | |
380 | | #[cfg(feature = "rayon")] |
381 | | /// Decompress the chunks in a file in parallel. |
382 | | /// The first call to `next` will fill the thread pool with jobs, |
383 | | /// starting to decompress the next few blocks. |
384 | | /// These jobs will finish, even if you stop reading more blocks. |
385 | | /// Implements iterator. |
386 | | #[derive(Debug)] |
387 | | pub struct ParallelBlockDecompressor<R: ChunksReader> { |
388 | | remaining_chunks: R, |
389 | | sender: std::sync::mpsc::Sender<Result<UncompressedBlock>>, |
390 | | receiver: std::sync::mpsc::Receiver<Result<UncompressedBlock>>, |
391 | | currently_decompressing_count: usize, |
392 | | max_threads: usize, |
393 | | |
394 | | shared_meta_data_ref: std::sync::Arc<MetaData>, |
395 | | pedantic: bool, |
396 | | |
397 | | pool: rayon_core::ThreadPool, |
398 | | } |
399 | | |
400 | | #[cfg(feature = "rayon")] |
401 | | impl<R: ChunksReader> ParallelBlockDecompressor<R> { |
402 | | |
403 | | /// Create a new decompressor. Does not immediately spawn any tasks. |
404 | | /// Decompression starts after the first call to `next`. |
405 | | /// Returns the chunks if parallel decompression should not be used. |
406 | | /// Use `new_with_thread_pool` to customize the threadpool. |
407 | 0 | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { |
408 | 0 | Self::new_with_thread_pool(chunks, pedantic, ||{ |
409 | 0 | rayon_core::ThreadPoolBuilder::new() |
410 | 0 | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0} |
411 | 0 | .build() |
412 | 0 | }) Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0} |
413 | 0 | } Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new |
414 | | |
415 | | /// Create a new decompressor. Does not immediately spawn any tasks. |
416 | | /// Decompression starts after the first call to `next`. |
417 | | /// Returns the chunks if parallel decompression should not be used. |
418 | 0 | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) |
419 | 0 | -> std::result::Result<Self, R> |
420 | 0 | where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError> |
421 | | { |
422 | | use crate::compression::Compression; |
423 | | |
424 | 0 | let is_entirely_uncompressed = chunks.meta_data().headers.iter() |
425 | 0 | .all(|head|head.compression == Compression::Uncompressed); Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_>::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0} |
426 | | |
427 | | // if no compression is used in the file, don't use a threadpool |
428 | 0 | if is_entirely_uncompressed { |
429 | 0 | return Err(chunks); |
430 | 0 | } |
431 | | |
432 | | // in case thread pool creation fails (for example on WASM currently), |
433 | | // we revert to sequential decompression |
434 | 0 | let pool = match try_create_thread_pool() { |
435 | 0 | Ok(pool) => pool, |
436 | | |
437 | | // TODO print warning? |
438 | 0 | Err(_) => return Err(chunks), |
439 | | }; |
440 | | |
441 | 0 | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times |
442 | | |
443 | 0 | let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic? |
444 | | |
445 | 0 | Ok(Self { |
446 | 0 | shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()), |
447 | 0 | currently_decompressing_count: 0, |
448 | 0 | remaining_chunks: chunks, |
449 | 0 | sender: send, |
450 | 0 | receiver: recv, |
451 | 0 | pedantic, |
452 | 0 | max_threads, |
453 | 0 |
|
454 | 0 | pool, |
455 | 0 | }) |
456 | 0 | } Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_> Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}> |
457 | | |
458 | | /// Fill the pool with decompression jobs. Returns the first job that finishes. |
459 | 0 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { |
460 | | |
461 | 0 | while self.currently_decompressing_count < self.max_threads { |
462 | 0 | let block = self.remaining_chunks.next(); |
463 | 0 | if let Some(block) = block { |
464 | 0 | let block = match block { |
465 | 0 | Ok(block) => block, |
466 | 0 | Err(error) => return Some(Err(error)) |
467 | | }; |
468 | | |
469 | 0 | let sender = self.sender.clone(); |
470 | 0 | let meta = self.shared_meta_data_ref.clone(); |
471 | 0 | let pedantic = self.pedantic; |
472 | | |
473 | 0 | self.currently_decompressing_count += 1; |
474 | | |
475 | 0 | self.pool.spawn(move || { |
476 | 0 | let decompressed_or_err = UncompressedBlock::decompress_chunk( |
477 | 0 | block, &meta, pedantic |
478 | | ); |
479 | | |
480 | | // by now, decompressing could have failed in another thread. |
481 | | // the error is then already handled, so we simply |
482 | | // don't send the decompressed block and do nothing |
483 | 0 | let _ = sender.send(decompressed_or_err); |
484 | 0 | }); Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block::{closure#0}Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0} |
485 | | } |
486 | | else { |
487 | | // there are no chunks left to decompress |
488 | 0 | break; |
489 | | } |
490 | | } |
491 | | |
492 | 0 | if self.currently_decompressing_count > 0 { |
493 | 0 | let next = self.receiver.recv() |
494 | 0 | .expect("all decompressing senders hung up but more messages were expected"); |
495 | | |
496 | 0 | self.currently_decompressing_count -= 1; |
497 | 0 | Some(next) |
498 | | } |
499 | | else { |
500 | 0 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable |
501 | 0 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); |
502 | 0 | None |
503 | | } |
504 | 0 | } Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block |
505 | | |
506 | | /// The extracted meta data of the image file. |
507 | 0 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::meta_data Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data |
508 | | } |
509 | | |
510 | | impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} |
511 | | impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { |
512 | | type Item = Result<UncompressedBlock>; |
513 | 0 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next |
514 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } |
515 | | } |
516 | | |
517 | | #[cfg(feature = "rayon")] |
518 | | impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} |
519 | | #[cfg(feature = "rayon")] |
520 | | impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { |
521 | | type Item = Result<UncompressedBlock>; |
522 | 0 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next |
523 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
524 | 0 | let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; |
525 | 0 | (remaining, Some(remaining)) |
526 | 0 | } |
527 | | } |
528 | | |
529 | | |
530 | | |
531 | | |
532 | | |