Coverage Report

Created: 2025-09-27 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/bzip2-0.4.4/src/bufread.rs
Line
Count
Source
1
//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3
use std::io;
4
use std::io::prelude::*;
5
6
#[cfg(feature = "tokio")]
7
use futures::Poll;
8
#[cfg(feature = "tokio")]
9
use tokio_io::{AsyncRead, AsyncWrite};
10
11
use {Action, Compress, Compression, Decompress, Status};
12
13
/// A bz2 encoder, or compressor.
14
///
15
/// This structure implements a `BufRead` interface and will read uncompressed
16
/// data from an underlying stream and emit a stream of compressed data.
17
pub struct BzEncoder<R> {
18
    obj: R,
19
    data: Compress,
20
    done: bool,
21
}
22
23
/// A bz2 decoder, or decompressor.
24
///
25
/// This structure implements a `BufRead` interface and takes a stream of
26
/// compressed data as input, providing the decompressed data when read from.
27
pub struct BzDecoder<R> {
28
    obj: R,
29
    data: Decompress,
30
    done: bool,
31
    multi: bool,
32
}
33
34
impl<R: BufRead> BzEncoder<R> {
35
    /// Creates a new encoder which will read uncompressed data from the given
36
    /// stream and emit the compressed stream.
37
0
    pub fn new(r: R, level: Compression) -> BzEncoder<R> {
38
0
        BzEncoder {
39
0
            obj: r,
40
0
            data: Compress::new(level, 30),
41
0
            done: false,
42
0
        }
43
0
    }
44
}
45
46
impl<R> BzEncoder<R> {
47
    /// Acquires a reference to the underlying stream
48
0
    pub fn get_ref(&self) -> &R {
49
0
        &self.obj
50
0
    }
51
52
    /// Acquires a mutable reference to the underlying stream
53
    ///
54
    /// Note that mutation of the stream may result in surprising results if
55
    /// this encoder is continued to be used.
56
0
    pub fn get_mut(&mut self) -> &mut R {
57
0
        &mut self.obj
58
0
    }
59
60
    /// Consumes this encoder, returning the underlying reader.
61
0
    pub fn into_inner(self) -> R {
62
0
        self.obj
63
0
    }
64
65
    /// Returns the number of bytes produced by the compressor
66
    /// (e.g. the number of bytes read from this stream)
67
    ///
68
    /// Note that, due to buffering, this only bears any relation to
69
    /// total_in() when the compressor chooses to flush its data
70
    /// (unfortunately, this won't happen in general
71
    /// at the end of the stream, because the compressor doesn't know
72
    /// if there's more data to come).  At that point,
73
    /// `total_out() / total_in()` would be the compression ratio.
74
0
    pub fn total_out(&self) -> u64 {
75
0
        self.data.total_out()
76
0
    }
77
78
    /// Returns the number of bytes consumed by the compressor
79
    /// (e.g. the number of bytes read from the underlying stream)
80
0
    pub fn total_in(&self) -> u64 {
81
0
        self.data.total_in()
82
0
    }
83
}
84
85
impl<R: BufRead> Read for BzEncoder<R> {
86
0
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
87
0
        if self.done {
88
0
            return Ok(0);
89
0
        }
90
        loop {
91
            let (read, consumed, eof, ret);
92
            {
93
0
                let input = self.obj.fill_buf()?;
94
0
                eof = input.is_empty();
95
0
                let before_out = self.data.total_out();
96
0
                let before_in = self.data.total_in();
97
0
                let action = if eof { Action::Finish } else { Action::Run };
98
0
                ret = self.data.compress(input, buf, action);
99
0
                read = (self.data.total_out() - before_out) as usize;
100
0
                consumed = (self.data.total_in() - before_in) as usize;
101
            }
102
0
            self.obj.consume(consumed);
103
104
            // we should never get the sequence error that's possible to be
105
            // returned from compression
106
0
            let ret = ret.unwrap();
107
108
            // If we haven't ready any data and we haven't hit EOF yet, then we
109
            // need to keep asking for more data because if we return that 0
110
            // bytes of data have been read then it will be interpreted as EOF.
111
0
            if read == 0 && !eof && buf.len() > 0 {
112
0
                continue;
113
0
            }
114
0
            if ret == Status::StreamEnd {
115
0
                self.done = true;
116
0
            }
117
0
            return Ok(read);
118
        }
119
0
    }
120
}
121
122
#[cfg(feature = "tokio")]
123
impl<R: AsyncRead + BufRead> AsyncRead for BzEncoder<R> {}
124
125
impl<W: Write> Write for BzEncoder<W> {
126
0
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
127
0
        self.get_mut().write(buf)
128
0
    }
129
130
0
    fn flush(&mut self) -> io::Result<()> {
131
0
        self.get_mut().flush()
132
0
    }
133
}
134
135
#[cfg(feature = "tokio")]
136
impl<R: AsyncWrite> AsyncWrite for BzEncoder<R> {
137
    fn shutdown(&mut self) -> Poll<(), io::Error> {
138
        self.get_mut().shutdown()
139
    }
140
}
141
142
impl<R: BufRead> BzDecoder<R> {
143
    /// Creates a new decoder which will decompress data read from the given
144
    /// stream.
145
2.61k
    pub fn new(r: R) -> BzDecoder<R> {
146
2.61k
        BzDecoder {
147
2.61k
            obj: r,
148
2.61k
            data: Decompress::new(false),
149
2.61k
            done: false,
150
2.61k
            multi: false,
151
2.61k
        }
152
2.61k
    }
<bzip2::bufread::BzDecoder<std::io::buffered::bufreader::BufReader<zip::read::CryptoReader>>>::new
Line
Count
Source
145
2.61k
    pub fn new(r: R) -> BzDecoder<R> {
146
2.61k
        BzDecoder {
147
2.61k
            obj: r,
148
2.61k
            data: Decompress::new(false),
149
2.61k
            done: false,
150
2.61k
            multi: false,
151
2.61k
        }
152
2.61k
    }
Unexecuted instantiation: <bzip2::bufread::BzDecoder<_>>::new
153
154
0
    fn multi(mut self, flag: bool) -> BzDecoder<R> {
155
0
        self.multi = flag;
156
0
        self
157
0
    }
158
}
159
160
impl<R> BzDecoder<R> {
161
    /// Acquires a reference to the underlying stream
162
0
    pub fn get_ref(&self) -> &R {
163
0
        &self.obj
164
0
    }
165
166
    /// Acquires a mutable reference to the underlying stream
167
    ///
168
    /// Note that mutation of the stream may result in surprising results if
169
    /// this encoder is continued to be used.
170
0
    pub fn get_mut(&mut self) -> &mut R {
171
0
        &mut self.obj
172
0
    }
173
174
    /// Consumes this decoder, returning the underlying reader.
175
0
    pub fn into_inner(self) -> R {
176
0
        self.obj
177
0
    }
Unexecuted instantiation: <bzip2::bufread::BzDecoder<std::io::buffered::bufreader::BufReader<zip::read::CryptoReader>>>::into_inner
Unexecuted instantiation: <bzip2::bufread::BzDecoder<_>>::into_inner
178
179
    /// Returns the number of bytes that the decompressor has consumed.
180
    ///
181
    /// Note that this will likely be smaller than what the decompressor
182
    /// actually read from the underlying stream due to buffering.
183
0
    pub fn total_in(&self) -> u64 {
184
0
        self.data.total_in()
185
0
    }
186
187
    /// Returns the number of bytes that the decompressor has produced.
188
0
    pub fn total_out(&self) -> u64 {
189
0
        self.data.total_out()
190
0
    }
191
}
192
193
impl<R: BufRead> Read for BzDecoder<R> {
194
4.14k
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
195
        loop {
196
4.23k
            if self.done && !self.multi {
197
1.04k
                return Ok(0);
198
3.19k
            }
199
            let (read, consumed, remaining, ret);
200
            {
201
3.19k
                let input = self.obj.fill_buf()?;
202
3.19k
                if self.done {
203
0
                    assert!(self.multi);
204
0
                    if input.is_empty() {
205
                        // beyond last stream in multi-stream case
206
0
                        return Ok(0);
207
0
                    } else {
208
0
                        // previous stream ended, more data follows => create new decompressor
209
0
                        self.data = Decompress::new(false);
210
0
                        self.done = false;
211
0
                    }
212
3.19k
                }
213
3.19k
                let before_out = self.data.total_out();
214
3.19k
                let before_in = self.data.total_in();
215
3.19k
                ret = self.data.decompress(input, buf);
216
3.19k
                read = (self.data.total_out() - before_out) as usize;
217
3.19k
                consumed = (self.data.total_in() - before_in) as usize;
218
3.19k
                remaining = input.len() - consumed;
219
            }
220
3.19k
            self.obj.consume(consumed);
221
222
3.19k
            let ret = ret.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
<bzip2::bufread::BzDecoder<std::io::buffered::bufreader::BufReader<zip::read::CryptoReader>> as std::io::Read>::read::{closure#0}
Line
Count
Source
222
1.51k
            let ret = ret.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
Unexecuted instantiation: <bzip2::bufread::BzDecoder<_> as std::io::Read>::read::{closure#0}
223
1.67k
            if ret == Status::StreamEnd {
224
1.04k
                self.done = true;
225
1.04k
            } else if consumed == 0 && remaining == 0 && read == 0 {
226
51
                return Err(io::Error::new(
227
51
                    io::ErrorKind::UnexpectedEof,
228
51
                    "decompression not finished but EOF reached",
229
51
                ));
230
577
            }
231
232
1.62k
            if read > 0 || buf.len() == 0 {
233
1.53k
                return Ok(read);
234
87
            }
235
        }
236
4.14k
    }
<bzip2::bufread::BzDecoder<std::io::buffered::bufreader::BufReader<zip::read::CryptoReader>> as std::io::Read>::read
Line
Count
Source
194
4.14k
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
195
        loop {
196
4.23k
            if self.done && !self.multi {
197
1.04k
                return Ok(0);
198
3.19k
            }
199
            let (read, consumed, remaining, ret);
200
            {
201
3.19k
                let input = self.obj.fill_buf()?;
202
3.19k
                if self.done {
203
0
                    assert!(self.multi);
204
0
                    if input.is_empty() {
205
                        // beyond last stream in multi-stream case
206
0
                        return Ok(0);
207
0
                    } else {
208
0
                        // previous stream ended, more data follows => create new decompressor
209
0
                        self.data = Decompress::new(false);
210
0
                        self.done = false;
211
0
                    }
212
3.19k
                }
213
3.19k
                let before_out = self.data.total_out();
214
3.19k
                let before_in = self.data.total_in();
215
3.19k
                ret = self.data.decompress(input, buf);
216
3.19k
                read = (self.data.total_out() - before_out) as usize;
217
3.19k
                consumed = (self.data.total_in() - before_in) as usize;
218
3.19k
                remaining = input.len() - consumed;
219
            }
220
3.19k
            self.obj.consume(consumed);
221
222
3.19k
            let ret = ret.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
223
1.67k
            if ret == Status::StreamEnd {
224
1.04k
                self.done = true;
225
1.04k
            } else if consumed == 0 && remaining == 0 && read == 0 {
226
51
                return Err(io::Error::new(
227
51
                    io::ErrorKind::UnexpectedEof,
228
51
                    "decompression not finished but EOF reached",
229
51
                ));
230
577
            }
231
232
1.62k
            if read > 0 || buf.len() == 0 {
233
1.53k
                return Ok(read);
234
87
            }
235
        }
236
4.14k
    }
Unexecuted instantiation: <bzip2::bufread::BzDecoder<_> as std::io::Read>::read
237
}
238
239
#[cfg(feature = "tokio")]
240
impl<R: AsyncRead + BufRead> AsyncRead for BzDecoder<R> {}
241
242
impl<W: Write> Write for BzDecoder<W> {
243
0
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
244
0
        self.get_mut().write(buf)
245
0
    }
246
247
0
    fn flush(&mut self) -> io::Result<()> {
248
0
        self.get_mut().flush()
249
0
    }
250
}
251
252
#[cfg(feature = "tokio")]
253
impl<R: AsyncWrite> AsyncWrite for BzDecoder<R> {
254
    fn shutdown(&mut self) -> Poll<(), io::Error> {
255
        self.get_mut().shutdown()
256
    }
257
}
258
259
/// A bzip2 streaming decoder that decodes all members of a multistream.
260
///
261
/// Wikipedia, particularly, uses bzip2 multistream for their dumps, and the
262
/// `pbzip2` tool creates such data as well;
263
pub struct MultiBzDecoder<R>(BzDecoder<R>);
264
265
impl<R: BufRead> MultiBzDecoder<R> {
266
    /// Creates a new decoder from the given reader. If the bzip2 stream contains multiple members
267
    /// all will be decoded.
268
0
    pub fn new(r: R) -> MultiBzDecoder<R> {
269
0
        MultiBzDecoder(BzDecoder::new(r).multi(true))
270
0
    }
271
}
272
273
impl<R> MultiBzDecoder<R> {
274
    /// Acquires a reference to the underlying reader.
275
0
    pub fn get_ref(&self) -> &R {
276
0
        self.0.get_ref()
277
0
    }
278
279
    /// Acquires a mutable reference to the underlying stream.
280
    ///
281
    /// Note that mutation of the stream may result in surprising results if
282
    /// this encoder is continued to be used.
283
0
    pub fn get_mut(&mut self) -> &mut R {
284
0
        self.0.get_mut()
285
0
    }
286
287
    /// Consumes this decoder, returning the underlying reader.
288
0
    pub fn into_inner(self) -> R {
289
0
        self.0.into_inner()
290
0
    }
291
}
292
293
impl<R: BufRead> Read for MultiBzDecoder<R> {
294
0
    fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
295
0
        self.0.read(into)
296
0
    }
297
}
298
299
#[cfg(feature = "tokio")]
300
impl<R: AsyncRead + BufRead> AsyncRead for MultiBzDecoder<R> {}
301
302
impl<R: BufRead + Write> Write for MultiBzDecoder<R> {
303
0
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
304
0
        self.get_mut().write(buf)
305
0
    }
306
307
0
    fn flush(&mut self) -> io::Result<()> {
308
0
        self.get_mut().flush()
309
0
    }
310
}
311
312
#[cfg(feature = "tokio")]
313
impl<R: AsyncWrite + BufRead> AsyncWrite for MultiBzDecoder<R> {
314
    fn shutdown(&mut self) -> Poll<(), io::Error> {
315
        self.get_mut().shutdown()
316
    }
317
}
318
319
#[cfg(test)]
320
mod tests {
321
    use super::MultiBzDecoder;
322
    use std::io::{BufReader, Read};
323
324
    #[test]
325
    fn bug_61() {
326
        let compressed_bytes = include_bytes!("../tests/bug_61.bz2");
327
        let uncompressed_bytes = include_bytes!("../tests/bug_61.raw");
328
        let reader = BufReader::with_capacity(8192, compressed_bytes.as_ref());
329
330
        let mut d = MultiBzDecoder::new(reader);
331
        let mut data = Vec::new();
332
333
        assert_eq!(d.read_to_end(&mut data).unwrap(), uncompressed_bytes.len());
334
        assert_eq!(data, uncompressed_bytes);
335
    }
336
}