/rust/registry/src/index.crates.io-1949cf8c6b5b557f/exr-1.74.0/src/block/reader.rs
Line | Count | Source |
1 | | //! Composable structures to handle reading an image. |
2 | | |
3 | | |
4 | | use std::convert::TryFrom; |
5 | | use std::fmt::Debug; |
6 | | use std::io::{Read, Seek}; |
7 | | use crate::block::{BlockIndex, UncompressedBlock}; |
8 | | use crate::block::chunk::{Chunk, TileCoordinates}; |
9 | | use crate::error::{Error, Result, u64_to_usize, UnitResult}; |
10 | | use crate::io::{PeekRead, Tracking}; |
11 | | use crate::meta::{MetaData, OffsetTables}; |
12 | | use crate::meta::header::Header; |
13 | | |
14 | | /// Decode the meta data from a byte source, keeping the source ready for further reading. |
15 | | /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`. |
16 | | #[derive(Debug)] |
17 | | pub struct Reader<R> { |
18 | | meta_data: MetaData, |
19 | | remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough? |
20 | | } |
21 | | |
22 | | impl<R: Read + Seek> Reader<R> { |
23 | | |
24 | | /// Start the reading process. |
25 | | /// Immediately decodes the meta data into an internal field. |
26 | | /// Access it via`meta_data()`. |
27 | 6.22k | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { |
28 | 6.22k | let mut remaining_reader = PeekRead::new(Tracking::new(read)); |
29 | 6.22k | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; |
30 | 2.36k | Ok(Self { meta_data, remaining_reader }) |
31 | 6.22k | } <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::read_from_buffered Line | Count | Source | 27 | 6.13k | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { | 28 | 6.13k | let mut remaining_reader = PeekRead::new(Tracking::new(read)); | 29 | 6.13k | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; | 30 | 2.28k | Ok(Self { meta_data, remaining_reader }) | 31 | 6.13k | } |
Unexecuted instantiation: <exr::block::reader::Reader<_>>::read_from_buffered <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::read_from_buffered Line | Count | Source | 27 | 85 | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> { | 28 | 85 | let mut remaining_reader = PeekRead::new(Tracking::new(read)); | 29 | 85 | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?; | 30 | 85 | Ok(Self { meta_data, remaining_reader }) | 31 | 85 | } |
|
32 | | |
33 | | // must not be mutable, as reading the file later on relies on the meta data |
34 | | /// The decoded exr meta data from the file. |
35 | 17.4k | pub fn meta_data(&self) -> &MetaData { &self.meta_data }<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::meta_data Line | Count | Source | 35 | 16.8k | pub fn meta_data(&self) -> &MetaData { &self.meta_data } |
Unexecuted instantiation: <exr::block::reader::Reader<_>>::meta_data <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::meta_data Line | Count | Source | 35 | 595 | pub fn meta_data(&self) -> &MetaData { &self.meta_data } |
|
36 | | |
37 | | /// The decoded exr meta data from the file. |
38 | 9.21k | pub fn headers(&self) -> &[Header] { &self.meta_data.headers }<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::headers Line | Count | Source | 38 | 8.87k | pub fn headers(&self) -> &[Header] { &self.meta_data.headers } |
Unexecuted instantiation: <exr::block::reader::Reader<_>>::headers <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::headers Line | Count | Source | 38 | 340 | pub fn headers(&self) -> &[Header] { &self.meta_data.headers } |
|
39 | | |
40 | | /// Obtain the meta data ownership. |
41 | 0 | pub fn into_meta_data(self) -> MetaData { self.meta_data } |
42 | | |
43 | | /// Prepare to read all the chunks from the file. |
44 | | /// Does not decode the chunks now, but returns a decoder. |
45 | | /// Reading all chunks reduces seeking the file, but some chunks might be read without being used. |
46 | 0 | pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> { |
47 | 0 | let total_chunk_count = { |
48 | 0 | if pedantic { |
49 | 0 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; |
50 | 0 | validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?; |
51 | 0 | offset_tables.iter().map(|table| table.len()).sum() |
52 | | } |
53 | | else { |
54 | 0 | MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)? |
55 | | } |
56 | | }; |
57 | | |
58 | 0 | Ok(AllChunksReader { |
59 | 0 | meta_data: self.meta_data, |
60 | 0 | remaining_chunks: 0 .. total_chunk_count, |
61 | 0 | remaining_bytes: self.remaining_reader, |
62 | 0 | pedantic |
63 | 0 | }) |
64 | 0 | } |
65 | | |
66 | | /// Prepare to read some the chunks from the file. |
67 | | /// Does not decode the chunks now, but returns a decoder. |
68 | | /// Reading only some chunks may seeking the file, potentially skipping many bytes. |
69 | | // TODO tile indices add no new information to block index?? |
70 | 2.26k | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { |
71 | 2.26k | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; |
72 | | |
73 | | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? |
74 | 2.04k | if pedantic { |
75 | 0 | validate_offset_tables( |
76 | 0 | self.meta_data.headers.as_slice(), &offset_tables, |
77 | 0 | self.remaining_reader.byte_position() |
78 | 0 | )?; |
79 | 2.04k | } |
80 | | |
81 | 2.04k | let mut filtered_offsets = Vec::with_capacity( |
82 | 2.04k | (self.meta_data.headers.len() * 32).min(2*2048) |
83 | | ); |
84 | | |
85 | | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied |
86 | | |
87 | 9.01k | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers |
88 | 1.61M | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order |
89 | 1.61M | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; |
90 | | |
91 | 1.61M | let block = BlockIndex { |
92 | 1.61M | layer: header_index, |
93 | 1.61M | level: tile.location.level_index, |
94 | 1.61M | pixel_position: data_indices.position.to_usize("data indices start")?, |
95 | 1.61M | pixel_size: data_indices.size, |
96 | | }; |
97 | | |
98 | 1.61M | if filter(&self.meta_data, tile.location, block) { |
99 | 1.36M | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` |
100 | 251k | } |
101 | | }; |
102 | | } |
103 | | |
104 | 2.04k | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) |
105 | | |
106 | 2.04k | if pedantic { |
107 | | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. |
108 | 0 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>::{closure#0}Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_>::{closure#0}Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>::{closure#0} |
109 | 0 | return Err(Error::invalid("chunk offset table")) |
110 | 0 | } |
111 | 2.04k | } |
112 | | |
113 | 2.04k | Ok(FilteredChunksReader { |
114 | 2.04k | meta_data: self.meta_data, |
115 | 2.04k | expected_filtered_chunk_count: filtered_offsets.len(), |
116 | 2.04k | remaining_filtered_chunk_indices: filtered_offsets.into_iter(), |
117 | 2.04k | remaining_bytes: self.remaining_reader |
118 | 2.04k | }) |
119 | 2.26k | } <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>Line | Count | Source | 70 | 2.18k | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { | 71 | 2.18k | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; | 72 | | | 73 | | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? | 74 | 1.95k | if pedantic { | 75 | 0 | validate_offset_tables( | 76 | 0 | self.meta_data.headers.as_slice(), &offset_tables, | 77 | 0 | self.remaining_reader.byte_position() | 78 | 0 | )?; | 79 | 1.95k | } | 80 | | | 81 | 1.95k | let mut filtered_offsets = Vec::with_capacity( | 82 | 1.95k | (self.meta_data.headers.len() * 32).min(2*2048) | 83 | | ); | 84 | | | 85 | | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied | 86 | | | 87 | 8.93k | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers | 88 | 1.26M | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order | 89 | 1.26M | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; | 90 | | | 91 | 1.26M | let block = BlockIndex { | 92 | 1.26M | layer: header_index, | 93 | 1.26M | level: tile.location.level_index, | 94 | 1.26M | pixel_position: data_indices.position.to_usize("data indices start")?, | 95 | 1.26M | pixel_size: data_indices.size, | 96 | | }; | 97 | | | 98 | 1.26M | if filter(&self.meta_data, tile.location, block) { | 99 | 1.01M | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` | 100 | 251k | } | 101 | | }; | 102 | | } | 103 | | | 104 | 1.95k | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) | 105 | | | 106 | 1.95k | if pedantic { | 107 | | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. | 108 | 0 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { | 109 | 0 | return Err(Error::invalid("chunk offset table")) | 110 | 0 | } | 111 | 1.95k | } | 112 | | | 113 | 1.95k | Ok(FilteredChunksReader { | 114 | 1.95k | meta_data: self.meta_data, | 115 | 1.95k | expected_filtered_chunk_count: filtered_offsets.len(), | 116 | 1.95k | remaining_filtered_chunk_indices: filtered_offsets.into_iter(), | 117 | 1.95k | remaining_bytes: self.remaining_reader | 118 | 1.95k | }) | 119 | 2.18k | } |
Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_> <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>Line | Count | Source | 70 | 85 | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> { | 71 | 85 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?; | 72 | | | 73 | | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk? | 74 | 85 | if pedantic { | 75 | 0 | validate_offset_tables( | 76 | 0 | self.meta_data.headers.as_slice(), &offset_tables, | 77 | 0 | self.remaining_reader.byte_position() | 78 | 0 | )?; | 79 | 85 | } | 80 | | | 81 | 85 | let mut filtered_offsets = Vec::with_capacity( | 82 | 85 | (self.meta_data.headers.len() * 32).min(2*2048) | 83 | | ); | 84 | | | 85 | | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied | 86 | | | 87 | 85 | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers | 88 | 354k | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order | 89 | 354k | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?; | 90 | | | 91 | 354k | let block = BlockIndex { | 92 | 354k | layer: header_index, | 93 | 354k | level: tile.location.level_index, | 94 | 354k | pixel_position: data_indices.position.to_usize("data indices start")?, | 95 | 354k | pixel_size: data_indices.size, | 96 | | }; | 97 | | | 98 | 354k | if filter(&self.meta_data, tile.location, block) { | 99 | 354k | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()` | 100 | 0 | } | 101 | | }; | 102 | | } | 103 | | | 104 | 85 | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing) | 105 | | | 106 | 85 | if pedantic { | 107 | | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid. | 108 | 0 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) { | 109 | 0 | return Err(Error::invalid("chunk offset table")) | 110 | 0 | } | 111 | 85 | } | 112 | | | 113 | 85 | Ok(FilteredChunksReader { | 114 | 85 | meta_data: self.meta_data, | 115 | 85 | expected_filtered_chunk_count: filtered_offsets.len(), | 116 | 85 | remaining_filtered_chunk_indices: filtered_offsets.into_iter(), | 117 | 85 | remaining_bytes: self.remaining_reader | 118 | 85 | }) | 119 | 85 | } |
|
120 | | } |
121 | | |
122 | | |
123 | 0 | fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult { |
124 | 0 | let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max |
125 | 0 | .map(|header| header.max_pixel_file_bytes()) |
126 | 0 | .sum(); |
127 | | |
128 | | // check that each offset is within the bounds |
129 | 0 | let end_byte = chunks_start_byte + max_pixel_bytes; |
130 | 0 | let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64, "chunk start")) |
131 | 0 | .any(|maybe_chunk_start| match maybe_chunk_start { |
132 | 0 | Ok(chunk_start) => chunk_start < chunks_start_byte || chunk_start > end_byte, |
133 | 0 | Err(_) => true |
134 | 0 | }); |
135 | | |
136 | 0 | if is_invalid { Err(Error::invalid("offset table")) } |
137 | 0 | else { Ok(()) } |
138 | 0 | } |
139 | | |
140 | | |
141 | | |
142 | | |
143 | | /// Decode the desired chunks and skip the unimportant chunks in the file. |
144 | | /// The decoded chunks can be decompressed by calling |
145 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. |
146 | | /// Call `on_progress` to have a callback with each block. |
147 | | /// Also contains the image meta data. |
148 | | #[derive(Debug)] |
149 | | pub struct FilteredChunksReader<R> { |
150 | | meta_data: MetaData, |
151 | | expected_filtered_chunk_count: usize, |
152 | | remaining_filtered_chunk_indices: std::vec::IntoIter<u64>, |
153 | | remaining_bytes: PeekRead<Tracking<R>>, |
154 | | } |
155 | | |
156 | | /// Decode all chunks in the file without seeking. |
157 | | /// The decoded chunks can be decompressed by calling |
158 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`. |
159 | | /// Call `on_progress` to have a callback with each block. |
160 | | /// Also contains the image meta data. |
161 | | #[derive(Debug)] |
162 | | pub struct AllChunksReader<R> { |
163 | | meta_data: MetaData, |
164 | | remaining_chunks: std::ops::Range<usize>, |
165 | | remaining_bytes: PeekRead<Tracking<R>>, |
166 | | pedantic: bool, |
167 | | } |
168 | | |
169 | | /// Decode chunks in the file without seeking. |
170 | | /// Calls the supplied closure for each chunk. |
171 | | /// The decoded chunks can be decompressed by calling |
172 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. |
173 | | /// Also contains the image meta data. |
174 | | #[derive(Debug)] |
175 | | pub struct OnProgressChunksReader<R, F> { |
176 | | chunks_reader: R, |
177 | | decoded_chunks: usize, |
178 | | callback: F, |
179 | | } |
180 | | |
181 | | /// Decode chunks in the file. |
182 | | /// The decoded chunks can be decompressed by calling |
183 | | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`. |
184 | | /// Call `on_progress` to have a callback with each block. |
185 | | /// Also contains the image meta data. |
186 | | pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator { |
187 | | |
188 | | /// The decoded exr meta data from the file. |
189 | | fn meta_data(&self) -> &MetaData; |
190 | | |
191 | | /// The decoded exr headers from the file. |
192 | 0 | fn headers(&self) -> &[Header] { &self.meta_data().headers } |
193 | | |
194 | | /// The number of chunks that this reader will return in total. |
195 | | /// Can be less than the total number of chunks in the file, if some chunks are skipped. |
196 | | fn expected_chunk_count(&self) -> usize; |
197 | | |
198 | | /// Read the next compressed chunk from the file. |
199 | | /// Equivalent to `.next()`, as this also is an iterator. |
200 | | /// Returns `None` if all chunks have been read. |
201 | 9 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk Line | Count | Source | 201 | 9 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::read_next_chunk Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk |
202 | | |
203 | | /// Create a new reader that calls the provided progress |
204 | | /// callback for each chunk that is read from the file. |
205 | | /// If the file can be successfully decoded, |
206 | | /// the progress will always at least once include 0.0 at the start and 1.0 at the end. |
207 | 2.04k | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { |
208 | 2.04k | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } |
209 | 2.04k | } <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)> Line | Count | Source | 207 | 1.95k | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { | 208 | 1.95k | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } | 209 | 1.95k | } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::on_progress::<_> <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)> Line | Count | Source | 207 | 85 | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) { | 208 | 85 | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 } | 209 | 85 | } |
|
210 | | |
211 | | #[cfg(feature = "rayon")] |
212 | | /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block. |
213 | | /// The order of the blocks is not deterministic. |
214 | | /// You can also use `parallel_decompressor` to obtain an iterator instead. |
215 | | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process. |
216 | | // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>) |
217 | 2.04k | fn decompress_parallel( |
218 | 2.04k | self, pedantic: bool, |
219 | 2.04k | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult |
220 | 2.04k | ) -> UnitResult |
221 | | { |
222 | 2.04k | let mut decompressor = match self.parallel_decompressor(pedantic) { |
223 | 9 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), |
224 | 2.03k | Ok(decompressor) => decompressor, |
225 | | }; |
226 | | |
227 | 359k | while let Some(block) = decompressor.next() { |
228 | 358k | insert_block(decompressor.meta_data(), block?)?; |
229 | | } |
230 | | |
231 | 170 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); |
232 | 170 | Ok(()) |
233 | 2.04k | } <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>Line | Count | Source | 217 | 1.95k | fn decompress_parallel( | 218 | 1.95k | self, pedantic: bool, | 219 | 1.95k | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult | 220 | 1.95k | ) -> UnitResult | 221 | | { | 222 | 1.95k | let mut decompressor = match self.parallel_decompressor(pedantic) { | 223 | 9 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), | 224 | 1.94k | Ok(decompressor) => decompressor, | 225 | | }; | 226 | | | 227 | 4.55k | while let Some(block) = decompressor.next() { | 228 | 4.46k | insert_block(decompressor.meta_data(), block?)?; | 229 | | } | 230 | | | 231 | 85 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); | 232 | 85 | Ok(()) | 233 | 1.95k | } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_parallel::<_> <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}>Line | Count | Source | 217 | 85 | fn decompress_parallel( | 218 | 85 | self, pedantic: bool, | 219 | 85 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult | 220 | 85 | ) -> UnitResult | 221 | | { | 222 | 85 | let mut decompressor = match self.parallel_decompressor(pedantic) { | 223 | 0 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block), | 224 | 85 | Ok(decompressor) => decompressor, | 225 | | }; | 226 | | | 227 | 354k | while let Some(block) = decompressor.next() { | 228 | 354k | insert_block(decompressor.meta_data(), block?)?; | 229 | | } | 230 | | | 231 | 85 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); | 232 | 85 | Ok(()) | 233 | 85 | } |
|
234 | | |
235 | | #[cfg(feature = "rayon")] |
236 | | /// Return an iterator that decompresses the chunks with multiple threads. |
237 | | /// The order of the blocks is not deterministic. |
238 | | /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool. |
239 | | /// By default, this uses as many threads as there are CPUs. |
240 | | /// Returns the `self` if there is no need for parallel decompression. |
241 | 2.04k | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { |
242 | 2.04k | ParallelBlockDecompressor::new(self, pedantic) |
243 | 2.04k | } <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor Line | Count | Source | 241 | 1.95k | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { | 242 | 1.95k | ParallelBlockDecompressor::new(self, pedantic) | 243 | 1.95k | } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::parallel_decompressor <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor Line | Count | Source | 241 | 85 | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> { | 242 | 85 | ParallelBlockDecompressor::new(self, pedantic) | 243 | 85 | } |
|
244 | | |
245 | | /// Return an iterator that decompresses the chunks in this thread. |
246 | | /// You can alternatively use `sequential_decompressor` if you prefer an external iterator. |
247 | 9 | fn decompress_sequential( |
248 | 9 | self, pedantic: bool, |
249 | 9 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult |
250 | 9 | ) -> UnitResult |
251 | | { |
252 | 9 | let mut decompressor = self.sequential_decompressor(pedantic); |
253 | 9 | while let Some(block) = decompressor.next() { |
254 | 9 | insert_block(decompressor.meta_data(), block?)?; |
255 | | } |
256 | | |
257 | 0 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); |
258 | 0 | Ok(()) |
259 | 9 | } Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#2}><exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>Line | Count | Source | 247 | 9 | fn decompress_sequential( | 248 | 9 | self, pedantic: bool, | 249 | 9 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult | 250 | 9 | ) -> UnitResult | 251 | | { | 252 | 9 | let mut decompressor = self.sequential_decompressor(pedantic); | 253 | 9 | while let Some(block) = decompressor.next() { | 254 | 9 | insert_block(decompressor.meta_data(), block?)?; | 255 | | } | 256 | | | 257 | 0 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks"); | 258 | 0 | Ok(()) | 259 | 9 | } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_sequential::<_> Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#2}>Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}> |
260 | | |
261 | | /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead. |
262 | 9 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { |
263 | 9 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } |
264 | 9 | } <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor Line | Count | Source | 262 | 9 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> { | 263 | 9 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic } | 264 | 9 | } |
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::sequential_decompressor Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor |
265 | | } |
266 | | |
267 | | impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { |
268 | 363k | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data Line | Count | Source | 268 | 8.38k | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::meta_data <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data Line | Count | Source | 268 | 354k | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() } |
|
269 | 362k | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count Line | Count | Source | 269 | 7.78k | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::expected_chunk_count <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count Line | Count | Source | 269 | 354k | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() } |
|
270 | | } |
271 | | |
272 | | impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {} |
273 | | impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) { |
274 | | type Item = Result<Chunk>; |
275 | | |
276 | 364k | fn next(&mut self) -> Option<Self::Item> { |
277 | 364k | self.chunks_reader.next().map(|item|{ |
278 | 362k | { |
279 | 362k | let total_chunks = self.expected_chunk_count() as f64; |
280 | 362k | let callback = &mut self.callback; |
281 | 362k | callback(self.decoded_chunks as f64 / total_chunks); |
282 | 362k | } |
283 | | |
284 | 362k | self.decoded_chunks += 1; |
285 | 362k | item |
286 | 362k | }) <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0}Line | Count | Source | 277 | 7.78k | self.chunks_reader.next().map(|item|{ | 278 | 7.78k | { | 279 | 7.78k | let total_chunks = self.expected_chunk_count() as f64; | 280 | 7.78k | let callback = &mut self.callback; | 281 | 7.78k | callback(self.decoded_chunks as f64 / total_chunks); | 282 | 7.78k | } | 283 | | | 284 | 7.78k | self.decoded_chunks += 1; | 285 | 7.78k | item | 286 | 7.78k | }) |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#0}<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0}Line | Count | Source | 277 | 354k | self.chunks_reader.next().map(|item|{ | 278 | 354k | { | 279 | 354k | let total_chunks = self.expected_chunk_count() as f64; | 280 | 354k | let callback = &mut self.callback; | 281 | 354k | callback(self.decoded_chunks as f64 / total_chunks); | 282 | 354k | } | 283 | | | 284 | 354k | self.decoded_chunks += 1; | 285 | 354k | item | 286 | 354k | }) |
|
287 | 364k | .or_else(||{ |
288 | 2.04k | debug_assert_eq!( |
289 | 0 | self.decoded_chunks, self.expected_chunk_count(), |
290 | 0 | "chunks reader finished but not all chunks are decompressed" |
291 | | ); |
292 | | |
293 | 2.04k | let callback = &mut self.callback; |
294 | 2.04k | callback(1.0); |
295 | 2.04k | None |
296 | 2.04k | }) <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1}Line | Count | Source | 287 | 287 | .or_else(||{ | 288 | 287 | debug_assert_eq!( | 289 | 0 | self.decoded_chunks, self.expected_chunk_count(), | 290 | 0 | "chunks reader finished but not all chunks are decompressed" | 291 | | ); | 292 | | | 293 | 287 | let callback = &mut self.callback; | 294 | 287 | callback(1.0); | 295 | 287 | None | 296 | 287 | }) |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#1}<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1}Line | Count | Source | 287 | 1.75k | .or_else(||{ | 288 | 1.75k | debug_assert_eq!( | 289 | 0 | self.decoded_chunks, self.expected_chunk_count(), | 290 | 0 | "chunks reader finished but not all chunks are decompressed" | 291 | | ); | 292 | | | 293 | 1.75k | let callback = &mut self.callback; | 294 | 1.75k | callback(1.0); | 295 | 1.75k | None | 296 | 1.75k | }) |
|
297 | 364k | } <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 276 | 8.07k | fn next(&mut self) -> Option<Self::Item> { | 277 | 8.07k | self.chunks_reader.next().map(|item|{ | 278 | | { | 279 | | let total_chunks = self.expected_chunk_count() as f64; | 280 | | let callback = &mut self.callback; | 281 | | callback(self.decoded_chunks as f64 / total_chunks); | 282 | | } | 283 | | | 284 | | self.decoded_chunks += 1; | 285 | | item | 286 | | }) | 287 | 8.07k | .or_else(||{ | 288 | | debug_assert_eq!( | 289 | | self.decoded_chunks, self.expected_chunk_count(), | 290 | | "chunks reader finished but not all chunks are decompressed" | 291 | | ); | 292 | | | 293 | | let callback = &mut self.callback; | 294 | | callback(1.0); | 295 | | None | 296 | | }) | 297 | 8.07k | } |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 276 | 356k | fn next(&mut self) -> Option<Self::Item> { | 277 | 356k | self.chunks_reader.next().map(|item|{ | 278 | | { | 279 | | let total_chunks = self.expected_chunk_count() as f64; | 280 | | let callback = &mut self.callback; | 281 | | callback(self.decoded_chunks as f64 / total_chunks); | 282 | | } | 283 | | | 284 | | self.decoded_chunks += 1; | 285 | | item | 286 | | }) | 287 | 356k | .or_else(||{ | 288 | | debug_assert_eq!( | 289 | | self.decoded_chunks, self.expected_chunk_count(), | 290 | | "chunks reader finished but not all chunks are decompressed" | 291 | | ); | 292 | | | 293 | | let callback = &mut self.callback; | 294 | | callback(1.0); | 295 | | None | 296 | | }) | 297 | 356k | } |
|
298 | | |
299 | 2.03k | fn size_hint(&self) -> (usize, Option<usize>) { |
300 | 2.03k | self.chunks_reader.size_hint() |
301 | 2.03k | } <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint Line | Count | Source | 299 | 1.94k | fn size_hint(&self) -> (usize, Option<usize>) { | 300 | 1.94k | self.chunks_reader.size_hint() | 301 | 1.94k | } |
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::size_hint <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint Line | Count | Source | 299 | 85 | fn size_hint(&self) -> (usize, Option<usize>) { | 300 | 85 | self.chunks_reader.size_hint() | 301 | 85 | } |
|
302 | | } |
303 | | |
304 | | impl<R: Read + Seek> ChunksReader for AllChunksReader<R> { |
305 | 0 | fn meta_data(&self) -> &MetaData { &self.meta_data } |
306 | 0 | fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end } |
307 | | } |
308 | | |
309 | | impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {} |
310 | | impl<R: Read + Seek> Iterator for AllChunksReader<R> { |
311 | | type Item = Result<Chunk>; |
312 | | |
313 | 0 | fn next(&mut self) -> Option<Self::Item> { |
314 | | // read as many chunks as the file should contain (inferred from meta data) |
315 | 0 | let next_chunk = self.remaining_chunks.next() |
316 | 0 | .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data)); |
317 | | |
318 | | // if no chunks are left, but some bytes remain, return error |
319 | 0 | if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() { |
320 | 0 | return Some(Err(Error::invalid("end of file expected"))); |
321 | 0 | } |
322 | | |
323 | 0 | next_chunk |
324 | 0 | } |
325 | | |
326 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
327 | 0 | (self.remaining_chunks.len(), Some(self.remaining_chunks.len())) |
328 | 0 | } |
329 | | } |
330 | | |
331 | | impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> { |
332 | 363k | fn meta_data(&self) -> &MetaData { &self.meta_data }<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::meta_data Line | Count | Source | 332 | 8.38k | fn meta_data(&self) -> &MetaData { &self.meta_data } |
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::meta_data <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::meta_data Line | Count | Source | 332 | 354k | fn meta_data(&self) -> &MetaData { &self.meta_data } |
|
333 | 362k | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::expected_chunk_count Line | Count | Source | 333 | 7.78k | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } |
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::expected_chunk_count <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::expected_chunk_count Line | Count | Source | 333 | 354k | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count } |
|
334 | | } |
335 | | |
336 | | impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {} |
337 | | impl<R: Read + Seek> Iterator for FilteredChunksReader<R> { |
338 | | type Item = Result<Chunk>; |
339 | | |
340 | 364k | fn next(&mut self) -> Option<Self::Item> { |
341 | | // read as many chunks as we have desired chunk offsets |
342 | 364k | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ |
343 | 362k | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts |
344 | 362k | usize::try_from(next_chunk_location)? |
345 | 1 | )?; |
346 | | |
347 | 362k | let meta_data = &self.meta_data; |
348 | 362k | Chunk::read(&mut self.remaining_bytes, meta_data) |
349 | 362k | }) <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next::{closure#0}Line | Count | Source | 342 | 7.78k | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ | 343 | 7.78k | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts | 344 | 7.78k | usize::try_from(next_chunk_location)? | 345 | 1 | )?; | 346 | | | 347 | 7.78k | let meta_data = &self.meta_data; | 348 | 7.78k | Chunk::read(&mut self.remaining_bytes, meta_data) | 349 | 7.78k | }) |
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next::{closure#0}<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next::{closure#0}Line | Count | Source | 342 | 354k | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ | 343 | 354k | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts | 344 | 354k | usize::try_from(next_chunk_location)? | 345 | 0 | )?; | 346 | | | 347 | 354k | let meta_data = &self.meta_data; | 348 | 354k | Chunk::read(&mut self.remaining_bytes, meta_data) | 349 | 354k | }) |
|
350 | | |
351 | | // TODO remember last chunk index and then seek to index+size and check whether bytes are left? |
352 | 364k | } <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 340 | 8.07k | fn next(&mut self) -> Option<Self::Item> { | 341 | | // read as many chunks as we have desired chunk offsets | 342 | 8.07k | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ | 343 | | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts | 344 | | usize::try_from(next_chunk_location)? | 345 | | )?; | 346 | | | 347 | | let meta_data = &self.meta_data; | 348 | | Chunk::read(&mut self.remaining_bytes, meta_data) | 349 | | }) | 350 | | | 351 | | // TODO remember last chunk index and then seek to index+size and check whether bytes are left? | 352 | 8.07k | } |
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 340 | 356k | fn next(&mut self) -> Option<Self::Item> { | 341 | | // read as many chunks as we have desired chunk offsets | 342 | 356k | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{ | 343 | | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts | 344 | | usize::try_from(next_chunk_location)? | 345 | | )?; | 346 | | | 347 | | let meta_data = &self.meta_data; | 348 | | Chunk::read(&mut self.remaining_bytes, meta_data) | 349 | | }) | 350 | | | 351 | | // TODO remember last chunk index and then seek to index+size and check whether bytes are left? | 352 | 356k | } |
|
353 | | |
354 | 2.03k | fn size_hint(&self) -> (usize, Option<usize>) { |
355 | 2.03k | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) |
356 | 2.03k | } <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::size_hint Line | Count | Source | 354 | 1.94k | fn size_hint(&self) -> (usize, Option<usize>) { | 355 | 1.94k | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) | 356 | 1.94k | } |
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::size_hint <exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::size_hint Line | Count | Source | 354 | 85 | fn size_hint(&self) -> (usize, Option<usize>) { | 355 | 85 | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len())) | 356 | 85 | } |
|
357 | | } |
358 | | |
359 | | /// Read all chunks from the file, decompressing each chunk immediately. |
360 | | /// Implements iterator. |
361 | | #[derive(Debug)] |
362 | | pub struct SequentialBlockDecompressor<R: ChunksReader> { |
363 | | remaining_chunks_reader: R, |
364 | | pedantic: bool, |
365 | | } |
366 | | |
367 | | impl<R: ChunksReader> SequentialBlockDecompressor<R> { |
368 | | |
369 | | /// The extracted meta data from the image file. |
370 | 9 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data Line | Count | Source | 370 | 9 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() } |
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::meta_data Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data |
371 | | |
372 | | /// Read and then decompress a single block of pixels from the byte source. |
373 | 9 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { |
374 | 9 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ |
375 | 9 | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) |
376 | 9 | }) <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}Line | Count | Source | 374 | 9 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ | 375 | 9 | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) | 376 | 9 | }) |
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block::{closure#0}Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0} |
377 | 9 | } <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block Line | Count | Source | 373 | 9 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { | 374 | 9 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{ | 375 | | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic) | 376 | | }) | 377 | 9 | } |
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block |
378 | | } |
379 | | |
380 | | #[cfg(feature = "rayon")] |
381 | | /// Decompress the chunks in a file in parallel. |
382 | | /// The first call to `next` will fill the thread pool with jobs, |
383 | | /// starting to decompress the next few blocks. |
384 | | /// These jobs will finish, even if you stop reading more blocks. |
385 | | /// Implements iterator. |
386 | | #[derive(Debug)] |
387 | | pub struct ParallelBlockDecompressor<R: ChunksReader> { |
388 | | remaining_chunks: R, |
389 | | sender: std::sync::mpsc::Sender<Result<UncompressedBlock>>, |
390 | | receiver: std::sync::mpsc::Receiver<Result<UncompressedBlock>>, |
391 | | currently_decompressing_count: usize, |
392 | | max_threads: usize, |
393 | | |
394 | | shared_meta_data_ref: std::sync::Arc<MetaData>, |
395 | | pedantic: bool, |
396 | | |
397 | | pool: rayon_core::ThreadPool, |
398 | | } |
399 | | |
400 | | #[cfg(feature = "rayon")] |
401 | | impl<R: ChunksReader> ParallelBlockDecompressor<R> { |
402 | | |
403 | | /// Create a new decompressor. Does not immediately spawn any tasks. |
404 | | /// Decompression starts after the first call to `next`. |
405 | | /// Returns the chunks if parallel decompression should not be used. |
406 | | /// Use `new_with_thread_pool` to customize the threadpool. |
407 | 2.04k | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { |
408 | 2.04k | Self::new_with_thread_pool(chunks, pedantic, ||{ |
409 | 2.03k | rayon_core::ThreadPoolBuilder::new() |
410 | 65.0k | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0}Line | Count | Source | 410 | 62.3k | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}::{closure#0}<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0}Line | Count | Source | 410 | 2.72k | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) |
|
411 | 2.03k | .build() |
412 | 2.03k | }) <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}Line | Count | Source | 408 | 1.94k | Self::new_with_thread_pool(chunks, pedantic, ||{ | 409 | 1.94k | rayon_core::ThreadPoolBuilder::new() | 410 | 1.94k | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) | 411 | 1.94k | .build() | 412 | 1.94k | }) |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}Line | Count | Source | 408 | 85 | Self::new_with_thread_pool(chunks, pedantic, ||{ | 409 | 85 | rayon_core::ThreadPoolBuilder::new() | 410 | 85 | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) | 411 | 85 | .build() | 412 | 85 | }) |
|
413 | 2.04k | } <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new Line | Count | Source | 407 | 1.95k | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { | 408 | 1.95k | Self::new_with_thread_pool(chunks, pedantic, ||{ | 409 | | rayon_core::ThreadPoolBuilder::new() | 410 | | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) | 411 | | .build() | 412 | | }) | 413 | 1.95k | } |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new Line | Count | Source | 407 | 85 | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> { | 408 | 85 | Self::new_with_thread_pool(chunks, pedantic, ||{ | 409 | | rayon_core::ThreadPoolBuilder::new() | 410 | | .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index)) | 411 | | .build() | 412 | | }) | 413 | 85 | } |
|
414 | | |
415 | | /// Create a new decompressor. Does not immediately spawn any tasks. |
416 | | /// Decompression starts after the first call to `next`. |
417 | | /// Returns the chunks if parallel decompression should not be used. |
418 | 2.04k | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) |
419 | 2.04k | -> std::result::Result<Self, R> |
420 | 2.04k | where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError> |
421 | | { |
422 | | use crate::compression::Compression; |
423 | | |
424 | 2.04k | let is_entirely_uncompressed = chunks.meta_data().headers.iter() |
425 | 2.57k | .all(|head|head.compression == Compression::Uncompressed); <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0}Line | Count | Source | 425 | 2.48k | .all(|head|head.compression == Compression::Uncompressed); |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_>::{closure#0}<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0}Line | Count | Source | 425 | 85 | .all(|head|head.compression == Compression::Uncompressed); |
|
426 | | |
427 | | // if no compression is used in the file, don't use a threadpool |
428 | 2.04k | if is_entirely_uncompressed { |
429 | 9 | return Err(chunks); |
430 | 2.03k | } |
431 | | |
432 | | // in case thread pool creation fails (for example on WASM currently), |
433 | | // we revert to sequential decompression |
434 | 2.03k | let pool = match try_create_thread_pool() { |
435 | 2.03k | Ok(pool) => pool, |
436 | | |
437 | | // TODO print warning? |
438 | 0 | Err(_) => return Err(chunks), |
439 | | }; |
440 | | |
441 | 2.03k | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times |
442 | | |
443 | 2.03k | let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic? |
444 | | |
445 | 2.03k | Ok(Self { |
446 | 2.03k | shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()), |
447 | 2.03k | currently_decompressing_count: 0, |
448 | 2.03k | remaining_chunks: chunks, |
449 | 2.03k | sender: send, |
450 | 2.03k | receiver: recv, |
451 | 2.03k | pedantic, |
452 | 2.03k | max_threads, |
453 | 2.03k | |
454 | 2.03k | pool, |
455 | 2.03k | }) |
456 | 2.04k | } <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>Line | Count | Source | 418 | 1.95k | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) | 419 | 1.95k | -> std::result::Result<Self, R> | 420 | 1.95k | where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError> | 421 | | { | 422 | | use crate::compression::Compression; | 423 | | | 424 | 1.95k | let is_entirely_uncompressed = chunks.meta_data().headers.iter() | 425 | 1.95k | .all(|head|head.compression == Compression::Uncompressed); | 426 | | | 427 | | // if no compression is used in the file, don't use a threadpool | 428 | 1.95k | if is_entirely_uncompressed { | 429 | 9 | return Err(chunks); | 430 | 1.94k | } | 431 | | | 432 | | // in case thread pool creation fails (for example on WASM currently), | 433 | | // we revert to sequential decompression | 434 | 1.94k | let pool = match try_create_thread_pool() { | 435 | 1.94k | Ok(pool) => pool, | 436 | | | 437 | | // TODO print warning? | 438 | 0 | Err(_) => return Err(chunks), | 439 | | }; | 440 | | | 441 | 1.94k | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times | 442 | | | 443 | 1.94k | let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic? | 444 | | | 445 | 1.94k | Ok(Self { | 446 | 1.94k | shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()), | 447 | 1.94k | currently_decompressing_count: 0, | 448 | 1.94k | remaining_chunks: chunks, | 449 | 1.94k | sender: send, | 450 | 1.94k | receiver: recv, | 451 | 1.94k | pedantic, | 452 | 1.94k | max_threads, | 453 | 1.94k | | 454 | 1.94k | pool, | 455 | 1.94k | }) | 456 | 1.95k | } |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_> <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}>Line | Count | Source | 418 | 85 | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool) | 419 | 85 | -> std::result::Result<Self, R> | 420 | 85 | where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError> | 421 | | { | 422 | | use crate::compression::Compression; | 423 | | | 424 | 85 | let is_entirely_uncompressed = chunks.meta_data().headers.iter() | 425 | 85 | .all(|head|head.compression == Compression::Uncompressed); | 426 | | | 427 | | // if no compression is used in the file, don't use a threadpool | 428 | 85 | if is_entirely_uncompressed { | 429 | 0 | return Err(chunks); | 430 | 85 | } | 431 | | | 432 | | // in case thread pool creation fails (for example on WASM currently), | 433 | | // we revert to sequential decompression | 434 | 85 | let pool = match try_create_thread_pool() { | 435 | 85 | Ok(pool) => pool, | 436 | | | 437 | | // TODO print warning? | 438 | 0 | Err(_) => return Err(chunks), | 439 | | }; | 440 | | | 441 | 85 | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times | 442 | | | 443 | 85 | let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic? | 444 | | | 445 | 85 | Ok(Self { | 446 | 85 | shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()), | 447 | 85 | currently_decompressing_count: 0, | 448 | 85 | remaining_chunks: chunks, | 449 | 85 | sender: send, | 450 | 85 | receiver: recv, | 451 | 85 | pedantic, | 452 | 85 | max_threads, | 453 | 85 | | 454 | 85 | pool, | 455 | 85 | }) | 456 | 85 | } |
|
457 | | |
458 | | /// Fill the pool with decompression jobs. Returns the first job that finishes. |
459 | 359k | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { |
460 | | |
461 | 719k | while self.currently_decompressing_count < self.max_threads { |
462 | 364k | let block = self.remaining_chunks.next(); |
463 | 364k | if let Some(block) = block { |
464 | 362k | let block = match block { |
465 | 360k | Ok(block) => block, |
466 | 1.76k | Err(error) => return Some(Err(error)) |
467 | | }; |
468 | | |
469 | 360k | let sender = self.sender.clone(); |
470 | 360k | let meta = self.shared_meta_data_ref.clone(); |
471 | 360k | let pedantic = self.pedantic; |
472 | | |
473 | 360k | self.currently_decompressing_count += 1; |
474 | | |
475 | 360k | self.pool.spawn(move || { |
476 | 360k | let decompressed_or_err = UncompressedBlock::decompress_chunk( |
477 | 360k | block, &meta, pedantic |
478 | | ); |
479 | | |
480 | | // by now, decompressing could have failed in another thread. |
481 | | // the error is then already handled, so we simply |
482 | | // don't send the decompressed block and do nothing |
483 | 360k | let _ = sender.send(decompressed_or_err); |
484 | 360k | }); <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}Line | Count | Source | 475 | 6.01k | self.pool.spawn(move || { | 476 | 6.01k | let decompressed_or_err = UncompressedBlock::decompress_chunk( | 477 | 6.01k | block, &meta, pedantic | 478 | | ); | 479 | | | 480 | | // by now, decompressing could have failed in another thread. | 481 | | // the error is then already handled, so we simply | 482 | | // don't send the decompressed block and do nothing | 483 | 6.01k | let _ = sender.send(decompressed_or_err); | 484 | 6.01k | }); |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block::{closure#0}<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}Line | Count | Source | 475 | 354k | self.pool.spawn(move || { | 476 | 354k | let decompressed_or_err = UncompressedBlock::decompress_chunk( | 477 | 354k | block, &meta, pedantic | 478 | | ); | 479 | | | 480 | | // by now, decompressing could have failed in another thread. | 481 | | // the error is then already handled, so we simply | 482 | | // don't send the decompressed block and do nothing | 483 | 354k | let _ = sender.send(decompressed_or_err); | 484 | 354k | }); |
|
485 | | } |
486 | | else { |
487 | | // there are no chunks left to decompress |
488 | 2.04k | break; |
489 | | } |
490 | | } |
491 | | |
492 | 357k | if self.currently_decompressing_count > 0 { |
493 | 357k | let next = self.receiver.recv() |
494 | 357k | .expect("all decompressing senders hung up but more messages were expected"); |
495 | | |
496 | 357k | self.currently_decompressing_count -= 1; |
497 | 357k | Some(next) |
498 | | } |
499 | | else { |
500 | 170 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable |
501 | 170 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); |
502 | 170 | None |
503 | | } |
504 | 359k | } <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block Line | Count | Source | 459 | 4.55k | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { | 460 | | | 461 | 10.5k | while self.currently_decompressing_count < self.max_threads { | 462 | 8.06k | let block = self.remaining_chunks.next(); | 463 | 8.06k | if let Some(block) = block { | 464 | 7.77k | let block = match block { | 465 | 6.01k | Ok(block) => block, | 466 | 1.76k | Err(error) => return Some(Err(error)) | 467 | | }; | 468 | | | 469 | 6.01k | let sender = self.sender.clone(); | 470 | 6.01k | let meta = self.shared_meta_data_ref.clone(); | 471 | 6.01k | let pedantic = self.pedantic; | 472 | | | 473 | 6.01k | self.currently_decompressing_count += 1; | 474 | | | 475 | 6.01k | self.pool.spawn(move || { | 476 | | let decompressed_or_err = UncompressedBlock::decompress_chunk( | 477 | | block, &meta, pedantic | 478 | | ); | 479 | | | 480 | | // by now, decompressing could have failed in another thread. | 481 | | // the error is then already handled, so we simply | 482 | | // don't send the decompressed block and do nothing | 483 | | let _ = sender.send(decompressed_or_err); | 484 | | }); | 485 | | } | 486 | | else { | 487 | | // there are no chunks left to decompress | 488 | 287 | break; | 489 | | } | 490 | | } | 491 | | | 492 | 2.79k | if self.currently_decompressing_count > 0 { | 493 | 2.70k | let next = self.receiver.recv() | 494 | 2.70k | .expect("all decompressing senders hung up but more messages were expected"); | 495 | | | 496 | 2.70k | self.currently_decompressing_count -= 1; | 497 | 2.70k | Some(next) | 498 | | } | 499 | | else { | 500 | 85 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable | 501 | 85 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); | 502 | 85 | None | 503 | | } | 504 | 4.55k | } |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block Line | Count | Source | 459 | 354k | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> { | 460 | | | 461 | 709k | while self.currently_decompressing_count < self.max_threads { | 462 | 356k | let block = self.remaining_chunks.next(); | 463 | 356k | if let Some(block) = block { | 464 | 354k | let block = match block { | 465 | 354k | Ok(block) => block, | 466 | 0 | Err(error) => return Some(Err(error)) | 467 | | }; | 468 | | | 469 | 354k | let sender = self.sender.clone(); | 470 | 354k | let meta = self.shared_meta_data_ref.clone(); | 471 | 354k | let pedantic = self.pedantic; | 472 | | | 473 | 354k | self.currently_decompressing_count += 1; | 474 | | | 475 | 354k | self.pool.spawn(move || { | 476 | | let decompressed_or_err = UncompressedBlock::decompress_chunk( | 477 | | block, &meta, pedantic | 478 | | ); | 479 | | | 480 | | // by now, decompressing could have failed in another thread. | 481 | | // the error is then already handled, so we simply | 482 | | // don't send the decompressed block and do nothing | 483 | | let _ = sender.send(decompressed_or_err); | 484 | | }); | 485 | | } | 486 | | else { | 487 | | // there are no chunks left to decompress | 488 | 1.75k | break; | 489 | | } | 490 | | } | 491 | | | 492 | 354k | if self.currently_decompressing_count > 0 { | 493 | 354k | let next = self.receiver.recv() | 494 | 354k | .expect("all decompressing senders hung up but more messages were expected"); | 495 | | | 496 | 354k | self.currently_decompressing_count -= 1; | 497 | 354k | Some(next) | 498 | | } | 499 | | else { | 500 | 85 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable | 501 | 85 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks"); | 502 | 85 | None | 503 | | } | 504 | 354k | } |
|
505 | | |
506 | | /// The extracted meta data of the image file. |
507 | 358k | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data Line | Count | Source | 507 | 4.46k | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::meta_data <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data Line | Count | Source | 507 | 354k | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() } |
|
508 | | } |
509 | | |
510 | | impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {} |
511 | | impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> { |
512 | | type Item = Result<UncompressedBlock>; |
513 | 9 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 513 | 9 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } |
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next |
514 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() } |
515 | | } |
516 | | |
517 | | #[cfg(feature = "rayon")] |
518 | | impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {} |
519 | | #[cfg(feature = "rayon")] |
520 | | impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> { |
521 | | type Item = Result<UncompressedBlock>; |
522 | 359k | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 522 | 4.55k | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } |
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next <exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next Line | Count | Source | 522 | 354k | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() } |
|
523 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
524 | 0 | let remaining = self.remaining_chunks.len() + self.currently_decompressing_count; |
525 | 0 | (remaining, Some(remaining)) |
526 | 0 | } |
527 | | } |
528 | | |
529 | | |
530 | | |
531 | | |
532 | | |