Coverage Report

Created: 2025-12-20 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/exr-1.74.0/src/block/reader.rs
Line
Count
Source
1
//! Composable structures to handle reading an image.
2
3
4
use std::convert::TryFrom;
5
use std::fmt::Debug;
6
use std::io::{Read, Seek};
7
use crate::block::{BlockIndex, UncompressedBlock};
8
use crate::block::chunk::{Chunk, TileCoordinates};
9
use crate::error::{Error, Result, u64_to_usize, UnitResult};
10
use crate::io::{PeekRead, Tracking};
11
use crate::meta::{MetaData, OffsetTables};
12
use crate::meta::header::Header;
13
14
/// Decode the meta data from a byte source, keeping the source ready for further reading.
15
/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
16
#[derive(Debug)]
17
pub struct Reader<R> {
18
    meta_data: MetaData,
19
    remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
20
}
21
22
impl<R: Read + Seek> Reader<R> {
23
24
    /// Start the reading process.
25
    /// Immediately decodes the meta data into an internal field.
26
    /// Access it via`meta_data()`.
27
6.22k
    pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
28
6.22k
        let mut remaining_reader = PeekRead::new(Tracking::new(read));
29
6.22k
        let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
30
2.36k
        Ok(Self { meta_data, remaining_reader })
31
6.22k
    }
<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::read_from_buffered
Line
Count
Source
27
6.13k
    pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
28
6.13k
        let mut remaining_reader = PeekRead::new(Tracking::new(read));
29
6.13k
        let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
30
2.28k
        Ok(Self { meta_data, remaining_reader })
31
6.13k
    }
Unexecuted instantiation: <exr::block::reader::Reader<_>>::read_from_buffered
<exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::read_from_buffered
Line
Count
Source
27
85
    pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
28
85
        let mut remaining_reader = PeekRead::new(Tracking::new(read));
29
85
        let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
30
85
        Ok(Self { meta_data, remaining_reader })
31
85
    }
32
33
    // must not be mutable, as reading the file later on relies on the meta data
34
    /// The decoded exr meta data from the file.
35
17.4k
    pub fn meta_data(&self) -> &MetaData { &self.meta_data }
<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::meta_data
Line
Count
Source
35
16.8k
    pub fn meta_data(&self) -> &MetaData { &self.meta_data }
Unexecuted instantiation: <exr::block::reader::Reader<_>>::meta_data
<exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::meta_data
Line
Count
Source
35
595
    pub fn meta_data(&self) -> &MetaData { &self.meta_data }
36
37
    /// The decoded exr meta data from the file.
38
9.21k
    pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::headers
Line
Count
Source
38
8.87k
    pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
Unexecuted instantiation: <exr::block::reader::Reader<_>>::headers
<exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::headers
Line
Count
Source
38
340
    pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
39
40
    /// Obtain the meta data ownership.
41
0
    pub fn into_meta_data(self) -> MetaData { self.meta_data }
42
43
    /// Prepare to read all the chunks from the file.
44
    /// Does not decode the chunks now, but returns a decoder.
45
    /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
46
0
    pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
47
0
        let total_chunk_count = {
48
0
            if pedantic {
49
0
                let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
50
0
                validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
51
0
                offset_tables.iter().map(|table| table.len()).sum()
52
            }
53
            else {
54
0
                MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?
55
            }
56
        };
57
58
0
        Ok(AllChunksReader {
59
0
            meta_data: self.meta_data,
60
0
            remaining_chunks: 0 .. total_chunk_count,
61
0
            remaining_bytes: self.remaining_reader,
62
0
            pedantic
63
0
        })
64
0
    }
65
66
    /// Prepare to read some the chunks from the file.
67
    /// Does not decode the chunks now, but returns a decoder.
68
    /// Reading only some chunks may seeking the file, potentially skipping many bytes.
69
    // TODO tile indices add no new information to block index??
70
2.26k
    pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
71
2.26k
        let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
72
73
        // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
74
2.04k
        if pedantic {
75
0
            validate_offset_tables(
76
0
                self.meta_data.headers.as_slice(), &offset_tables,
77
0
                self.remaining_reader.byte_position()
78
0
            )?;
79
2.04k
        }
80
81
2.04k
        let mut filtered_offsets = Vec::with_capacity(
82
2.04k
            (self.meta_data.headers.len() * 32).min(2*2048)
83
        );
84
85
        // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
86
87
9.01k
        for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
88
1.61M
            for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
89
1.61M
                let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
90
91
1.61M
                let block = BlockIndex {
92
1.61M
                    layer: header_index,
93
1.61M
                    level: tile.location.level_index,
94
1.61M
                    pixel_position: data_indices.position.to_usize("data indices start")?,
95
1.61M
                    pixel_size: data_indices.size,
96
                };
97
98
1.61M
                if filter(&self.meta_data, tile.location, block) {
99
1.36M
                    filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
100
251k
                }
101
            };
102
        }
103
104
2.04k
        filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
105
106
2.04k
        if pedantic {
107
            // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
108
0
            if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>::{closure#0}
Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_>::{closure#0}
Unexecuted instantiation: <exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>::{closure#0}
109
0
                return Err(Error::invalid("chunk offset table"))
110
0
            }
111
2.04k
        }
112
113
2.04k
        Ok(FilteredChunksReader {
114
2.04k
            meta_data: self.meta_data,
115
2.04k
            expected_filtered_chunk_count: filtered_offsets.len(),
116
2.04k
            remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
117
2.04k
            remaining_bytes: self.remaining_reader
118
2.04k
        })
119
2.26k
    }
<exr::block::reader::Reader<std::io::cursor::Cursor<&[u8]>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#0}>
Line
Count
Source
70
2.18k
    pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
71
2.18k
        let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
72
73
        // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
74
1.95k
        if pedantic {
75
0
            validate_offset_tables(
76
0
                self.meta_data.headers.as_slice(), &offset_tables,
77
0
                self.remaining_reader.byte_position()
78
0
            )?;
79
1.95k
        }
80
81
1.95k
        let mut filtered_offsets = Vec::with_capacity(
82
1.95k
            (self.meta_data.headers.len() * 32).min(2*2048)
83
        );
84
85
        // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
86
87
8.93k
        for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
88
1.26M
            for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
89
1.26M
                let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
90
91
1.26M
                let block = BlockIndex {
92
1.26M
                    layer: header_index,
93
1.26M
                    level: tile.location.level_index,
94
1.26M
                    pixel_position: data_indices.position.to_usize("data indices start")?,
95
1.26M
                    pixel_size: data_indices.size,
96
                };
97
98
1.26M
                if filter(&self.meta_data, tile.location, block) {
99
1.01M
                    filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
100
251k
                }
101
            };
102
        }
103
104
1.95k
        filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
105
106
1.95k
        if pedantic {
107
            // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
108
0
            if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
109
0
                return Err(Error::invalid("chunk offset table"))
110
0
            }
111
1.95k
        }
112
113
1.95k
        Ok(FilteredChunksReader {
114
1.95k
            meta_data: self.meta_data,
115
1.95k
            expected_filtered_chunk_count: filtered_offsets.len(),
116
1.95k
            remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
117
1.95k
            remaining_bytes: self.remaining_reader
118
1.95k
        })
119
2.18k
    }
Unexecuted instantiation: <exr::block::reader::Reader<_>>::filter_chunks::<_>
<exr::block::reader::Reader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>>::filter_chunks::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#0}>
Line
Count
Source
70
85
    pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
71
85
        let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
72
73
        // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
74
85
        if pedantic {
75
0
            validate_offset_tables(
76
0
                self.meta_data.headers.as_slice(), &offset_tables,
77
0
                self.remaining_reader.byte_position()
78
0
            )?;
79
85
        }
80
81
85
        let mut filtered_offsets = Vec::with_capacity(
82
85
            (self.meta_data.headers.len() * 32).min(2*2048)
83
        );
84
85
        // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
86
87
85
        for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
88
354k
            for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
89
354k
                let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
90
91
354k
                let block = BlockIndex {
92
354k
                    layer: header_index,
93
354k
                    level: tile.location.level_index,
94
354k
                    pixel_position: data_indices.position.to_usize("data indices start")?,
95
354k
                    pixel_size: data_indices.size,
96
                };
97
98
354k
                if filter(&self.meta_data, tile.location, block) {
99
354k
                    filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
100
0
                }
101
            };
102
        }
103
104
85
        filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
105
106
85
        if pedantic {
107
            // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
108
0
            if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
109
0
                return Err(Error::invalid("chunk offset table"))
110
0
            }
111
85
        }
112
113
85
        Ok(FilteredChunksReader {
114
85
            meta_data: self.meta_data,
115
85
            expected_filtered_chunk_count: filtered_offsets.len(),
116
85
            remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
117
85
            remaining_bytes: self.remaining_reader
118
85
        })
119
85
    }
120
}
121
122
123
0
fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
124
0
    let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max
125
0
        .map(|header| header.max_pixel_file_bytes())
126
0
        .sum();
127
128
    // check that each offset is within the bounds
129
0
    let end_byte = chunks_start_byte + max_pixel_bytes;
130
0
    let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64, "chunk start"))
131
0
        .any(|maybe_chunk_start| match maybe_chunk_start {
132
0
            Ok(chunk_start) => chunk_start < chunks_start_byte || chunk_start > end_byte,
133
0
            Err(_) => true
134
0
        });
135
136
0
    if is_invalid { Err(Error::invalid("offset table")) }
137
0
    else { Ok(()) }
138
0
}
139
140
141
142
143
/// Decode the desired chunks and skip the unimportant chunks in the file.
144
/// The decoded chunks can be decompressed by calling
145
/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
146
/// Call `on_progress` to have a callback with each block.
147
/// Also contains the image meta data.
148
#[derive(Debug)]
149
pub struct FilteredChunksReader<R> {
150
    meta_data: MetaData,
151
    expected_filtered_chunk_count: usize,
152
    remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
153
    remaining_bytes: PeekRead<Tracking<R>>,
154
}
155
156
/// Decode all chunks in the file without seeking.
157
/// The decoded chunks can be decompressed by calling
158
/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
159
/// Call `on_progress` to have a callback with each block.
160
/// Also contains the image meta data.
161
#[derive(Debug)]
162
pub struct AllChunksReader<R> {
163
    meta_data: MetaData,
164
    remaining_chunks: std::ops::Range<usize>,
165
    remaining_bytes: PeekRead<Tracking<R>>,
166
    pedantic: bool,
167
}
168
169
/// Decode chunks in the file without seeking.
170
/// Calls the supplied closure for each chunk.
171
/// The decoded chunks can be decompressed by calling
172
/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
173
/// Also contains the image meta data.
174
#[derive(Debug)]
175
pub struct OnProgressChunksReader<R, F> {
176
    chunks_reader: R,
177
    decoded_chunks: usize,
178
    callback: F,
179
}
180
181
/// Decode chunks in the file.
182
/// The decoded chunks can be decompressed by calling
183
/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
184
/// Call `on_progress` to have a callback with each block.
185
/// Also contains the image meta data.
186
pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
187
188
    /// The decoded exr meta data from the file.
189
    fn meta_data(&self) -> &MetaData;
190
191
    /// The decoded exr headers from the file.
192
0
    fn headers(&self) -> &[Header] { &self.meta_data().headers }
193
194
    /// The number of chunks that this reader will return in total.
195
    /// Can be less than the total number of chunks in the file, if some chunks are skipped.
196
    fn expected_chunk_count(&self) -> usize;
197
198
    /// Read the next compressed chunk from the file.
199
    /// Equivalent to `.next()`, as this also is an iterator.
200
    /// Returns `None` if all chunks have been read.
201
9
    fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk
Line
Count
Source
201
9
    fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::read_next_chunk
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::read_next_chunk
202
203
    /// Create a new reader that calls the provided progress
204
    /// callback for each chunk that is read from the file.
205
    /// If the file can be successfully decoded,
206
    /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
207
2.04k
    fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
208
2.04k
        OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
209
2.04k
    }
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)>
Line
Count
Source
207
1.95k
    fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
208
1.95k
        OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
209
1.95k
    }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::on_progress::<_>
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::on_progress::<&mut fn(f64)>
Line
Count
Source
207
85
    fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
208
85
        OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
209
85
    }
210
211
    #[cfg(feature = "rayon")]
212
    /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
213
    /// The order of the blocks is not deterministic.
214
    /// You can also use `parallel_decompressor` to obtain an iterator instead.
215
    /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
216
    // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
217
2.04k
    fn decompress_parallel(
218
2.04k
        self, pedantic: bool,
219
2.04k
        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
220
2.04k
    ) -> UnitResult
221
    {
222
2.04k
        let mut decompressor = match self.parallel_decompressor(pedantic) {
223
9
            Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
224
2.03k
            Ok(decompressor) => decompressor,
225
        };
226
227
359k
        while let Some(block) = decompressor.next() {
228
358k
            insert_block(decompressor.meta_data(), block?)?;
229
        }
230
231
170
        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
232
170
        Ok(())
233
2.04k
    }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>
Line
Count
Source
217
1.95k
    fn decompress_parallel(
218
1.95k
        self, pedantic: bool,
219
1.95k
        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
220
1.95k
    ) -> UnitResult
221
    {
222
1.95k
        let mut decompressor = match self.parallel_decompressor(pedantic) {
223
9
            Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
224
1.94k
            Ok(decompressor) => decompressor,
225
        };
226
227
4.55k
        while let Some(block) = decompressor.next() {
228
4.46k
            insert_block(decompressor.meta_data(), block?)?;
229
        }
230
231
85
        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
232
85
        Ok(())
233
1.95k
    }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_parallel::<_>
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_parallel::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}>
Line
Count
Source
217
85
    fn decompress_parallel(
218
85
        self, pedantic: bool,
219
85
        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
220
85
    ) -> UnitResult
221
    {
222
85
        let mut decompressor = match self.parallel_decompressor(pedantic) {
223
0
            Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
224
85
            Ok(decompressor) => decompressor,
225
        };
226
227
354k
        while let Some(block) = decompressor.next() {
228
354k
            insert_block(decompressor.meta_data(), block?)?;
229
        }
230
231
85
        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
232
85
        Ok(())
233
85
    }
234
235
    #[cfg(feature = "rayon")]
236
    /// Return an iterator that decompresses the chunks with multiple threads.
237
    /// The order of the blocks is not deterministic.
238
    /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
239
    /// By default, this uses as many threads as there are CPUs.
240
    /// Returns the `self` if there is no need for parallel decompression.
241
2.04k
    fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
242
2.04k
        ParallelBlockDecompressor::new(self, pedantic)
243
2.04k
    }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor
Line
Count
Source
241
1.95k
    fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
242
1.95k
        ParallelBlockDecompressor::new(self, pedantic)
243
1.95k
    }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::parallel_decompressor
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::parallel_decompressor
Line
Count
Source
241
85
    fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
242
85
        ParallelBlockDecompressor::new(self, pedantic)
243
85
    }
244
245
    /// Return an iterator that decompresses the chunks in this thread.
246
    /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
247
9
    fn decompress_sequential(
248
9
        self, pedantic: bool,
249
9
        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
250
9
    ) -> UnitResult
251
    {
252
9
        let mut decompressor = self.sequential_decompressor(pedantic);
253
9
        while let Some(block) = decompressor.next() {
254
9
            insert_block(decompressor.meta_data(), block?)?;
255
        }
256
257
0
        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
258
0
        Ok(())
259
9
    }
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#2}>
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<&[u8]>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<&[u8]>>::{closure#1}>
Line
Count
Source
247
9
    fn decompress_sequential(
248
9
        self, pedantic: bool,
249
9
        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
250
9
    ) -> UnitResult
251
    {
252
9
        let mut decompressor = self.sequential_decompressor(pedantic);
253
9
        while let Some(block) = decompressor.next() {
254
9
            insert_block(decompressor.meta_data(), block?)?;
255
        }
256
257
0
        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
258
0
        Ok(())
259
9
    }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::decompress_sequential::<_>
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#2}>
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::decompress_sequential::<<exr::image::read::image::ReadImage<fn(f64), exr::image::read::layers::ReadFirstValidLayer<exr::image::read::specific_channels::CollectPixels<exr::image::read::specific_channels::ReadOptionalChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::read::specific_channels::ReadRequiredChannel<exr::image::recursive::NoneMore, f32>, f32>, f32>, f32>, (f32, f32, f32, f32), alloc::vec::Vec<f32>, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#2}, <image::codecs::openexr::OpenExrDecoder<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as image::io::decoder::ImageDecoder>::read_image::{closure#3}>>>>::from_chunks<exr::image::Layer<exr::image::SpecificChannels<alloc::vec::Vec<f32>, (exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, exr::meta::attribute::ChannelDescription, core::option::Option<exr::meta::attribute::ChannelDescription>)>>, std::io::cursor::Cursor<alloc::vec::Vec<u8>>>::{closure#1}>
260
261
    /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
262
9
    fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
263
9
        SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
264
9
    }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor
Line
Count
Source
262
9
    fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
263
9
        SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
264
9
    }
Unexecuted instantiation: <_ as exr::block::reader::ChunksReader>::sequential_decompressor
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::sequential_decompressor
265
}
266
267
impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
268
363k
    fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data
Line
Count
Source
268
8.38k
    fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::meta_data
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::meta_data
Line
Count
Source
268
354k
    fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
269
362k
    fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count
Line
Count
Source
269
7.78k
    fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as exr::block::reader::ChunksReader>::expected_chunk_count
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as exr::block::reader::ChunksReader>::expected_chunk_count
Line
Count
Source
269
354k
    fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
270
}
271
272
impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
273
impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
274
    type Item = Result<Chunk>;
275
276
364k
    fn next(&mut self) -> Option<Self::Item> {
277
364k
        self.chunks_reader.next().map(|item|{
278
362k
            {
279
362k
                let total_chunks = self.expected_chunk_count() as f64;
280
362k
                let callback = &mut self.callback;
281
362k
                callback(self.decoded_chunks as f64 / total_chunks);
282
362k
            }
283
284
362k
            self.decoded_chunks += 1;
285
362k
            item
286
362k
        })
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0}
Line
Count
Source
277
7.78k
        self.chunks_reader.next().map(|item|{
278
7.78k
            {
279
7.78k
                let total_chunks = self.expected_chunk_count() as f64;
280
7.78k
                let callback = &mut self.callback;
281
7.78k
                callback(self.decoded_chunks as f64 / total_chunks);
282
7.78k
            }
283
284
7.78k
            self.decoded_chunks += 1;
285
7.78k
            item
286
7.78k
        })
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#0}
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#0}
Line
Count
Source
277
354k
        self.chunks_reader.next().map(|item|{
278
354k
            {
279
354k
                let total_chunks = self.expected_chunk_count() as f64;
280
354k
                let callback = &mut self.callback;
281
354k
                callback(self.decoded_chunks as f64 / total_chunks);
282
354k
            }
283
284
354k
            self.decoded_chunks += 1;
285
354k
            item
286
354k
        })
287
364k
            .or_else(||{
288
2.04k
                debug_assert_eq!(
289
0
                    self.decoded_chunks, self.expected_chunk_count(),
290
0
                    "chunks reader finished but not all chunks are decompressed"
291
                );
292
293
2.04k
                let callback = &mut self.callback;
294
2.04k
                callback(1.0);
295
2.04k
                None
296
2.04k
            })
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1}
Line
Count
Source
287
287
            .or_else(||{
288
287
                debug_assert_eq!(
289
0
                    self.decoded_chunks, self.expected_chunk_count(),
290
0
                    "chunks reader finished but not all chunks are decompressed"
291
                );
292
293
287
                let callback = &mut self.callback;
294
287
                callback(1.0);
295
287
                None
296
287
            })
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next::{closure#1}
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next::{closure#1}
Line
Count
Source
287
1.75k
            .or_else(||{
288
1.75k
                debug_assert_eq!(
289
0
                    self.decoded_chunks, self.expected_chunk_count(),
290
0
                    "chunks reader finished but not all chunks are decompressed"
291
                );
292
293
1.75k
                let callback = &mut self.callback;
294
1.75k
                callback(1.0);
295
1.75k
                None
296
1.75k
            })
297
364k
    }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
276
8.07k
    fn next(&mut self) -> Option<Self::Item> {
277
8.07k
        self.chunks_reader.next().map(|item|{
278
            {
279
                let total_chunks = self.expected_chunk_count() as f64;
280
                let callback = &mut self.callback;
281
                callback(self.decoded_chunks as f64 / total_chunks);
282
            }
283
284
            self.decoded_chunks += 1;
285
            item
286
        })
287
8.07k
            .or_else(||{
288
                debug_assert_eq!(
289
                    self.decoded_chunks, self.expected_chunk_count(),
290
                    "chunks reader finished but not all chunks are decompressed"
291
                );
292
293
                let callback = &mut self.callback;
294
                callback(1.0);
295
                None
296
            })
297
8.07k
    }
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::next
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
276
356k
    fn next(&mut self) -> Option<Self::Item> {
277
356k
        self.chunks_reader.next().map(|item|{
278
            {
279
                let total_chunks = self.expected_chunk_count() as f64;
280
                let callback = &mut self.callback;
281
                callback(self.decoded_chunks as f64 / total_chunks);
282
            }
283
284
            self.decoded_chunks += 1;
285
            item
286
        })
287
356k
            .or_else(||{
288
                debug_assert_eq!(
289
                    self.decoded_chunks, self.expected_chunk_count(),
290
                    "chunks reader finished but not all chunks are decompressed"
291
                );
292
293
                let callback = &mut self.callback;
294
                callback(1.0);
295
                None
296
            })
297
356k
    }
298
299
2.03k
    fn size_hint(&self) -> (usize, Option<usize>) {
300
2.03k
        self.chunks_reader.size_hint()
301
2.03k
    }
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint
Line
Count
Source
299
1.94k
    fn size_hint(&self) -> (usize, Option<usize>) {
300
1.94k
        self.chunks_reader.size_hint()
301
1.94k
    }
Unexecuted instantiation: <exr::block::reader::OnProgressChunksReader<_, _> as core::iter::traits::iterator::Iterator>::size_hint
<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)> as core::iter::traits::iterator::Iterator>::size_hint
Line
Count
Source
299
85
    fn size_hint(&self) -> (usize, Option<usize>) {
300
85
        self.chunks_reader.size_hint()
301
85
    }
302
}
303
304
impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
305
0
    fn meta_data(&self) -> &MetaData { &self.meta_data }
306
0
    fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
307
}
308
309
impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
310
impl<R: Read + Seek> Iterator for AllChunksReader<R> {
311
    type Item = Result<Chunk>;
312
313
0
    fn next(&mut self) -> Option<Self::Item> {
314
        // read as many chunks as the file should contain (inferred from meta data)
315
0
        let next_chunk = self.remaining_chunks.next()
316
0
            .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
317
318
        // if no chunks are left, but some bytes remain, return error
319
0
        if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
320
0
            return Some(Err(Error::invalid("end of file expected")));
321
0
        }
322
323
0
        next_chunk
324
0
    }
325
326
0
    fn size_hint(&self) -> (usize, Option<usize>) {
327
0
        (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
328
0
    }
329
}
330
331
impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
332
363k
    fn meta_data(&self) -> &MetaData { &self.meta_data }
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::meta_data
Line
Count
Source
332
8.38k
    fn meta_data(&self) -> &MetaData { &self.meta_data }
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::meta_data
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::meta_data
Line
Count
Source
332
354k
    fn meta_data(&self) -> &MetaData { &self.meta_data }
333
362k
    fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as exr::block::reader::ChunksReader>::expected_chunk_count
Line
Count
Source
333
7.78k
    fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as exr::block::reader::ChunksReader>::expected_chunk_count
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as exr::block::reader::ChunksReader>::expected_chunk_count
Line
Count
Source
333
354k
    fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
334
}
335
336
impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
337
impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
338
    type Item = Result<Chunk>;
339
340
364k
    fn next(&mut self) -> Option<Self::Item> {
341
        // read as many chunks as we have desired chunk offsets
342
364k
        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343
362k
            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344
362k
                  usize::try_from(next_chunk_location)?
345
1
            )?;
346
347
362k
            let meta_data = &self.meta_data;
348
362k
            Chunk::read(&mut self.remaining_bytes, meta_data)
349
362k
        })
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next::{closure#0}
Line
Count
Source
342
7.78k
        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343
7.78k
            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344
7.78k
                  usize::try_from(next_chunk_location)?
345
1
            )?;
346
347
7.78k
            let meta_data = &self.meta_data;
348
7.78k
            Chunk::read(&mut self.remaining_bytes, meta_data)
349
7.78k
        })
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next::{closure#0}
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next::{closure#0}
Line
Count
Source
342
354k
        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343
354k
            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344
354k
                  usize::try_from(next_chunk_location)?
345
0
            )?;
346
347
354k
            let meta_data = &self.meta_data;
348
354k
            Chunk::read(&mut self.remaining_bytes, meta_data)
349
354k
        })
350
351
        // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
352
364k
    }
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
340
8.07k
    fn next(&mut self) -> Option<Self::Item> {
341
        // read as many chunks as we have desired chunk offsets
342
8.07k
        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343
            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344
                  usize::try_from(next_chunk_location)?
345
            )?;
346
347
            let meta_data = &self.meta_data;
348
            Chunk::read(&mut self.remaining_bytes, meta_data)
349
        })
350
351
        // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
352
8.07k
    }
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::next
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
340
356k
    fn next(&mut self) -> Option<Self::Item> {
341
        // read as many chunks as we have desired chunk offsets
342
356k
        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343
            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344
                  usize::try_from(next_chunk_location)?
345
            )?;
346
347
            let meta_data = &self.meta_data;
348
            Chunk::read(&mut self.remaining_bytes, meta_data)
349
        })
350
351
        // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
352
356k
    }
353
354
2.03k
    fn size_hint(&self) -> (usize, Option<usize>) {
355
2.03k
        (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
356
2.03k
    }
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>> as core::iter::traits::iterator::Iterator>::size_hint
Line
Count
Source
354
1.94k
    fn size_hint(&self) -> (usize, Option<usize>) {
355
1.94k
        (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
356
1.94k
    }
Unexecuted instantiation: <exr::block::reader::FilteredChunksReader<_> as core::iter::traits::iterator::Iterator>::size_hint
<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>> as core::iter::traits::iterator::Iterator>::size_hint
Line
Count
Source
354
85
    fn size_hint(&self) -> (usize, Option<usize>) {
355
85
        (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
356
85
    }
357
}
358
359
/// Read all chunks from the file, decompressing each chunk immediately.
360
/// Implements iterator.
361
#[derive(Debug)]
362
pub struct SequentialBlockDecompressor<R: ChunksReader> {
363
    remaining_chunks_reader: R,
364
    pedantic: bool,
365
}
366
367
impl<R: ChunksReader> SequentialBlockDecompressor<R> {
368
369
    /// The extracted meta data from the image file.
370
9
    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data
Line
Count
Source
370
9
    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::meta_data
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data
371
372
    /// Read and then decompress a single block of pixels from the byte source.
373
9
    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
374
9
        self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
375
9
            UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
376
9
        })
<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}
Line
Count
Source
374
9
        self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
375
9
            UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
376
9
        })
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block::{closure#0}
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}
377
9
    }
<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block
Line
Count
Source
373
9
    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
374
9
        self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
375
            UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
376
        })
377
9
    }
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_>>::decompress_next_block
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block
378
}
379
380
#[cfg(feature = "rayon")]
381
/// Decompress the chunks in a file in parallel.
382
/// The first call to `next` will fill the thread pool with jobs,
383
/// starting to decompress the next few blocks.
384
/// These jobs will finish, even if you stop reading more blocks.
385
/// Implements iterator.
386
#[derive(Debug)]
387
pub struct ParallelBlockDecompressor<R: ChunksReader> {
388
    remaining_chunks: R,
389
    sender: std::sync::mpsc::Sender<Result<UncompressedBlock>>,
390
    receiver: std::sync::mpsc::Receiver<Result<UncompressedBlock>>,
391
    currently_decompressing_count: usize,
392
    max_threads: usize,
393
394
    shared_meta_data_ref: std::sync::Arc<MetaData>,
395
    pedantic: bool,
396
397
    pool: rayon_core::ThreadPool,
398
}
399
400
#[cfg(feature = "rayon")]
401
impl<R: ChunksReader> ParallelBlockDecompressor<R> {
402
403
    /// Create a new decompressor. Does not immediately spawn any tasks.
404
    /// Decompression starts after the first call to `next`.
405
    /// Returns the chunks if parallel decompression should not be used.
406
    /// Use `new_with_thread_pool` to customize the threadpool.
407
2.04k
    pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408
2.04k
        Self::new_with_thread_pool(chunks, pedantic, ||{
409
2.03k
            rayon_core::ThreadPoolBuilder::new()
410
65.0k
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0}
Line
Count
Source
410
62.3k
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}::{closure#0}
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}::{closure#0}
Line
Count
Source
410
2.72k
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411
2.03k
                .build()
412
2.03k
        })
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}
Line
Count
Source
408
1.94k
        Self::new_with_thread_pool(chunks, pedantic, ||{
409
1.94k
            rayon_core::ThreadPoolBuilder::new()
410
1.94k
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411
1.94k
                .build()
412
1.94k
        })
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new::{closure#0}
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}
Line
Count
Source
408
85
        Self::new_with_thread_pool(chunks, pedantic, ||{
409
85
            rayon_core::ThreadPoolBuilder::new()
410
85
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411
85
                .build()
412
85
        })
413
2.04k
    }
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new
Line
Count
Source
407
1.95k
    pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408
1.95k
        Self::new_with_thread_pool(chunks, pedantic, ||{
409
            rayon_core::ThreadPoolBuilder::new()
410
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411
                .build()
412
        })
413
1.95k
    }
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new
Line
Count
Source
407
85
    pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408
85
        Self::new_with_thread_pool(chunks, pedantic, ||{
409
            rayon_core::ThreadPoolBuilder::new()
410
                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411
                .build()
412
        })
413
85
    }
414
415
    /// Create a new decompressor. Does not immediately spawn any tasks.
416
    /// Decompression starts after the first call to `next`.
417
    /// Returns the chunks if parallel decompression should not be used.
418
2.04k
    pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419
2.04k
        -> std::result::Result<Self, R>
420
2.04k
        where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError>
421
    {
422
        use crate::compression::Compression;
423
424
2.04k
        let is_entirely_uncompressed = chunks.meta_data().headers.iter()
425
2.57k
            .all(|head|head.compression == Compression::Uncompressed);
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0}
Line
Count
Source
425
2.48k
            .all(|head|head.compression == Compression::Uncompressed);
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_>::{closure#0}
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}>::{closure#0}
Line
Count
Source
425
85
            .all(|head|head.compression == Compression::Uncompressed);
426
427
        // if no compression is used in the file, don't use a threadpool
428
2.04k
        if is_entirely_uncompressed {
429
9
            return Err(chunks);
430
2.03k
        }
431
432
        // in case thread pool creation fails (for example on WASM currently),
433
        // we revert to sequential decompression
434
2.03k
        let pool = match try_create_thread_pool() {
435
2.03k
            Ok(pool) => pool,
436
437
            // TODO print warning?
438
0
            Err(_) => return Err(chunks),
439
        };
440
441
2.03k
        let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
442
443
2.03k
        let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic?
444
445
2.03k
        Ok(Self {
446
2.03k
            shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()),
447
2.03k
            currently_decompressing_count: 0,
448
2.03k
            remaining_chunks: chunks,
449
2.03k
            sender: send,
450
2.03k
            receiver: recv,
451
2.03k
            pedantic,
452
2.03k
            max_threads,
453
2.03k
454
2.03k
            pool,
455
2.03k
        })
456
2.04k
    }
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::new::{closure#0}>
Line
Count
Source
418
1.95k
    pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419
1.95k
        -> std::result::Result<Self, R>
420
1.95k
        where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError>
421
    {
422
        use crate::compression::Compression;
423
424
1.95k
        let is_entirely_uncompressed = chunks.meta_data().headers.iter()
425
1.95k
            .all(|head|head.compression == Compression::Uncompressed);
426
427
        // if no compression is used in the file, don't use a threadpool
428
1.95k
        if is_entirely_uncompressed {
429
9
            return Err(chunks);
430
1.94k
        }
431
432
        // in case thread pool creation fails (for example on WASM currently),
433
        // we revert to sequential decompression
434
1.94k
        let pool = match try_create_thread_pool() {
435
1.94k
            Ok(pool) => pool,
436
437
            // TODO print warning?
438
0
            Err(_) => return Err(chunks),
439
        };
440
441
1.94k
        let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
442
443
1.94k
        let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic?
444
445
1.94k
        Ok(Self {
446
1.94k
            shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()),
447
1.94k
            currently_decompressing_count: 0,
448
1.94k
            remaining_chunks: chunks,
449
1.94k
            sender: send,
450
1.94k
            receiver: recv,
451
1.94k
            pedantic,
452
1.94k
            max_threads,
453
1.94k
454
1.94k
            pool,
455
1.94k
        })
456
1.95k
    }
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::new_with_thread_pool::<_>
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new_with_thread_pool::<<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::new::{closure#0}>
Line
Count
Source
418
85
    pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419
85
        -> std::result::Result<Self, R>
420
85
        where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError>
421
    {
422
        use crate::compression::Compression;
423
424
85
        let is_entirely_uncompressed = chunks.meta_data().headers.iter()
425
85
            .all(|head|head.compression == Compression::Uncompressed);
426
427
        // if no compression is used in the file, don't use a threadpool
428
85
        if is_entirely_uncompressed {
429
0
            return Err(chunks);
430
85
        }
431
432
        // in case thread pool creation fails (for example on WASM currently),
433
        // we revert to sequential decompression
434
85
        let pool = match try_create_thread_pool() {
435
85
            Ok(pool) => pool,
436
437
            // TODO print warning?
438
0
            Err(_) => return Err(chunks),
439
        };
440
441
85
        let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
442
443
85
        let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic?
444
445
85
        Ok(Self {
446
85
            shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()),
447
85
            currently_decompressing_count: 0,
448
85
            remaining_chunks: chunks,
449
85
            sender: send,
450
85
            receiver: recv,
451
85
            pedantic,
452
85
            max_threads,
453
85
454
85
            pool,
455
85
        })
456
85
    }
457
458
    /// Fill the pool with decompression jobs. Returns the first job that finishes.
459
359k
    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
460
461
719k
        while self.currently_decompressing_count < self.max_threads {
462
364k
            let block = self.remaining_chunks.next();
463
364k
            if let Some(block) = block {
464
362k
                let block = match block {
465
360k
                    Ok(block) => block,
466
1.76k
                    Err(error) => return Some(Err(error))
467
                };
468
469
360k
                let sender = self.sender.clone();
470
360k
                let meta = self.shared_meta_data_ref.clone();
471
360k
                let pedantic = self.pedantic;
472
473
360k
                self.currently_decompressing_count += 1;
474
475
360k
                self.pool.spawn(move || {
476
360k
                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477
360k
                        block, &meta, pedantic
478
                    );
479
480
                    // by now, decompressing could have failed in another thread.
481
                    // the error is then already handled, so we simply
482
                    // don't send the decompressed block and do nothing
483
360k
                    let _ = sender.send(decompressed_or_err);
484
360k
                });
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}
Line
Count
Source
475
6.01k
                self.pool.spawn(move || {
476
6.01k
                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477
6.01k
                        block, &meta, pedantic
478
                    );
479
480
                    // by now, decompressing could have failed in another thread.
481
                    // the error is then already handled, so we simply
482
                    // don't send the decompressed block and do nothing
483
6.01k
                    let _ = sender.send(decompressed_or_err);
484
6.01k
                });
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block::{closure#0}
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block::{closure#0}
Line
Count
Source
475
354k
                self.pool.spawn(move || {
476
354k
                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477
354k
                        block, &meta, pedantic
478
                    );
479
480
                    // by now, decompressing could have failed in another thread.
481
                    // the error is then already handled, so we simply
482
                    // don't send the decompressed block and do nothing
483
354k
                    let _ = sender.send(decompressed_or_err);
484
354k
                });
485
            }
486
            else {
487
                // there are no chunks left to decompress
488
2.04k
                break;
489
            }
490
        }
491
492
357k
        if self.currently_decompressing_count > 0 {
493
357k
            let next = self.receiver.recv()
494
357k
                .expect("all decompressing senders hung up but more messages were expected");
495
496
357k
            self.currently_decompressing_count -= 1;
497
357k
            Some(next)
498
        }
499
        else {
500
170
            debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable
501
170
            debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
502
170
            None
503
        }
504
359k
    }
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::decompress_next_block
Line
Count
Source
459
4.55k
    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
460
461
10.5k
        while self.currently_decompressing_count < self.max_threads {
462
8.06k
            let block = self.remaining_chunks.next();
463
8.06k
            if let Some(block) = block {
464
7.77k
                let block = match block {
465
6.01k
                    Ok(block) => block,
466
1.76k
                    Err(error) => return Some(Err(error))
467
                };
468
469
6.01k
                let sender = self.sender.clone();
470
6.01k
                let meta = self.shared_meta_data_ref.clone();
471
6.01k
                let pedantic = self.pedantic;
472
473
6.01k
                self.currently_decompressing_count += 1;
474
475
6.01k
                self.pool.spawn(move || {
476
                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477
                        block, &meta, pedantic
478
                    );
479
480
                    // by now, decompressing could have failed in another thread.
481
                    // the error is then already handled, so we simply
482
                    // don't send the decompressed block and do nothing
483
                    let _ = sender.send(decompressed_or_err);
484
                });
485
            }
486
            else {
487
                // there are no chunks left to decompress
488
287
                break;
489
            }
490
        }
491
492
2.79k
        if self.currently_decompressing_count > 0 {
493
2.70k
            let next = self.receiver.recv()
494
2.70k
                .expect("all decompressing senders hung up but more messages were expected");
495
496
2.70k
            self.currently_decompressing_count -= 1;
497
2.70k
            Some(next)
498
        }
499
        else {
500
85
            debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable
501
85
            debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
502
85
            None
503
        }
504
4.55k
    }
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::decompress_next_block
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::decompress_next_block
Line
Count
Source
459
354k
    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
460
461
709k
        while self.currently_decompressing_count < self.max_threads {
462
356k
            let block = self.remaining_chunks.next();
463
356k
            if let Some(block) = block {
464
354k
                let block = match block {
465
354k
                    Ok(block) => block,
466
0
                    Err(error) => return Some(Err(error))
467
                };
468
469
354k
                let sender = self.sender.clone();
470
354k
                let meta = self.shared_meta_data_ref.clone();
471
354k
                let pedantic = self.pedantic;
472
473
354k
                self.currently_decompressing_count += 1;
474
475
354k
                self.pool.spawn(move || {
476
                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477
                        block, &meta, pedantic
478
                    );
479
480
                    // by now, decompressing could have failed in another thread.
481
                    // the error is then already handled, so we simply
482
                    // don't send the decompressed block and do nothing
483
                    let _ = sender.send(decompressed_or_err);
484
                });
485
            }
486
            else {
487
                // there are no chunks left to decompress
488
1.75k
                break;
489
            }
490
        }
491
492
354k
        if self.currently_decompressing_count > 0 {
493
354k
            let next = self.receiver.recv()
494
354k
                .expect("all decompressing senders hung up but more messages were expected");
495
496
354k
            self.currently_decompressing_count -= 1;
497
354k
            Some(next)
498
        }
499
        else {
500
85
            debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable
501
85
            debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
502
85
            None
503
        }
504
354k
    }
505
506
    /// The extracted meta data of the image file.
507
358k
    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>>>::meta_data
Line
Count
Source
507
4.46k
    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_>>::meta_data
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>>>::meta_data
Line
Count
Source
507
354k
    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
508
}
509
510
impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
511
impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
512
    type Item = Result<UncompressedBlock>;
513
9
    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
<exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
513
9
    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next
Unexecuted instantiation: <exr::block::reader::SequentialBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next
514
0
    fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
515
}
516
517
#[cfg(feature = "rayon")]
518
impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
519
#[cfg(feature = "rayon")]
520
impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
521
    type Item = Result<UncompressedBlock>;
522
359k
    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<&[u8]>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
522
4.55k
    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
Unexecuted instantiation: <exr::block::reader::ParallelBlockDecompressor<_> as core::iter::traits::iterator::Iterator>::next
<exr::block::reader::ParallelBlockDecompressor<exr::block::reader::OnProgressChunksReader<exr::block::reader::FilteredChunksReader<std::io::cursor::Cursor<alloc::vec::Vec<u8>>>, &mut fn(f64)>> as core::iter::traits::iterator::Iterator>::next
Line
Count
Source
522
354k
    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
523
0
    fn size_hint(&self) -> (usize, Option<usize>) {
524
0
        let remaining = self.remaining_chunks.len() + self.currently_decompressing_count;
525
0
        (remaining, Some(remaining))
526
0
    }
527
}
528
529
530
531
532