Coverage Report

Created: 2025-11-16 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/lzma-rs-0.2.0/src/decode/stream.rs
Line
Count
Source
1
use crate::decode::lzbuffer::{LzBuffer, LzCircularBuffer};
2
use crate::decode::lzma::{new_circular, new_circular_with_memlimit, DecoderState, LzmaParams};
3
use crate::decode::rangecoder::RangeDecoder;
4
use crate::decompress::Options;
5
use crate::error::Error;
6
use std::fmt::Debug;
7
use std::io::{self, BufRead, Cursor, Read, Write};
8
9
/// Minimum header length to be read.
10
/// - props: u8 (1 byte)
11
/// - dict_size: u32 (4 bytes)
12
const MIN_HEADER_LEN: usize = 5;
13
14
/// Max header length to be read.
15
/// - unpacked_size: u64 (8 bytes)
16
const MAX_HEADER_LEN: usize = MIN_HEADER_LEN + 8;
17
18
/// Required bytes after the header.
19
/// - ignore: u8 (1 byte)
20
/// - code: u32 (4 bytes)
21
const START_BYTES: usize = 5;
22
23
/// Maximum number of bytes to buffer while reading the header.
24
const MAX_TMP_LEN: usize = MAX_HEADER_LEN + START_BYTES;
25
26
/// Internal state of this streaming decoder. This is needed because we have to
27
/// initialize the stream before processing any data.
28
#[derive(Debug)]
29
enum State<W>
30
where
31
    W: Write,
32
{
33
    /// Stream is initialized but header values have not yet been read.
34
    Header(W),
35
    /// Header values have been read and the stream is ready to process more data.
36
    Data(RunState<W>),
37
}
38
39
/// Structures needed while decoding data.
40
struct RunState<W>
41
where
42
    W: Write,
43
{
44
    decoder: DecoderState<W, LzCircularBuffer<W>>,
45
    range: u32,
46
    code: u32,
47
}
48
49
impl<W> Debug for RunState<W>
50
where
51
    W: Write,
52
{
53
0
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
54
0
        fmt.debug_struct("RunState")
55
0
            .field("range", &self.range)
56
0
            .field("code", &self.code)
57
0
            .finish()
58
0
    }
59
}
60
61
/// Lzma decompressor that can process multiple chunks of data using the
62
/// `io::Write` interface.
63
pub struct Stream<W>
64
where
65
    W: Write,
66
{
67
    /// Temporary buffer to hold data while the header is being read.
68
    tmp: Cursor<[u8; MAX_TMP_LEN]>,
69
    /// Whether the stream is initialized and ready to process data.
70
    /// An `Option` is used to avoid interior mutability when updating the state.
71
    state: Option<State<W>>,
72
    /// Options given when a stream is created.
73
    options: Options,
74
}
75
76
impl<W> Stream<W>
77
where
78
    W: Write,
79
{
80
    /// Initialize the stream. This will consume the `output` which is the sink
81
    /// implementing `io::Write` that will receive decompressed bytes.
82
0
    pub fn new(output: W) -> Self {
83
0
        Self::new_with_options(&Options::default(), output)
84
0
    }
85
86
    /// Initialize the stream with the given `options`. This will consume the
87
    /// `output` which is the sink implementing `io::Write` that will
88
    /// receive decompressed bytes.
89
65.3k
    pub fn new_with_options(options: &Options, output: W) -> Self {
90
65.3k
        Self {
91
65.3k
            tmp: Cursor::new([0; MAX_TMP_LEN]),
92
65.3k
            state: Some(State::Header(output)),
93
65.3k
            options: *options,
94
65.3k
        }
95
65.3k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::new_with_options
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::new_with_options
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::new_with_options
Line
Count
Source
89
65.3k
    pub fn new_with_options(options: &Options, output: W) -> Self {
90
65.3k
        Self {
91
65.3k
            tmp: Cursor::new([0; MAX_TMP_LEN]),
92
65.3k
            state: Some(State::Header(output)),
93
65.3k
            options: *options,
94
65.3k
        }
95
65.3k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::new_with_options
96
97
    /// Get a reference to the output sink
98
0
    pub fn get_output(&self) -> Option<&W> {
99
0
        self.state.as_ref().map(|state| match state {
100
0
            State::Header(output) => &output,
101
0
            State::Data(state) => state.decoder.output.get_output(),
102
0
        })
103
0
    }
104
105
    /// Get a mutable reference to the output sink
106
231k
    pub fn get_output_mut(&mut self) -> Option<&mut W> {
107
231k
        self.state.as_mut().map(|state| match state {
108
129k
            State::Header(output) => output,
109
54.9k
            State::Data(state) => state.decoder.output.get_output_mut(),
110
184k
        })
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::get_output_mut::{closure#0}
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::get_output_mut::{closure#0}
Line
Count
Source
107
184k
        self.state.as_mut().map(|state| match state {
108
129k
            State::Header(output) => output,
109
54.9k
            State::Data(state) => state.decoder.output.get_output_mut(),
110
184k
        })
111
231k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::get_output_mut
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::get_output_mut
Line
Count
Source
106
231k
    pub fn get_output_mut(&mut self) -> Option<&mut W> {
107
231k
        self.state.as_mut().map(|state| match state {
108
            State::Header(output) => output,
109
            State::Data(state) => state.decoder.output.get_output_mut(),
110
        })
111
231k
    }
112
113
    /// Consumes the stream and returns the output sink. This also makes sure
114
    /// we have properly reached the end of the stream.
115
3.10k
    pub fn finish(mut self) -> crate::error::Result<W> {
116
3.10k
        if let Some(state) = self.state.take() {
117
3.10k
            match state {
118
1.58k
                State::Header(output) => {
119
1.58k
                    if self.tmp.position() > 0 {
120
857
                        Err(Error::LzmaError("failed to read header".to_string()))
121
                    } else {
122
732
                        Ok(output)
123
                    }
124
                }
125
1.51k
                State::Data(mut state) => {
126
1.51k
                    if !self.options.allow_incomplete {
127
                        // Process one last time with empty input to force end of
128
                        // stream checks
129
1.51k
                        let mut stream =
130
1.51k
                            Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
131
1.51k
                        let mut range_decoder =
132
1.51k
                            RangeDecoder::from_parts(&mut stream, state.range, state.code);
133
1.51k
                        state.decoder.process(&mut range_decoder)?;
134
0
                    }
135
344
                    let output = state.decoder.output.finish()?;
136
344
                    Ok(output)
137
                }
138
            }
139
        } else {
140
            // this will occur if a call to `write()` fails
141
0
            Err(Error::LzmaError(
142
0
                "can't finish stream because of previous write error".to_string(),
143
0
            ))
144
        }
145
3.10k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::finish
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::finish
Line
Count
Source
115
3.10k
    pub fn finish(mut self) -> crate::error::Result<W> {
116
3.10k
        if let Some(state) = self.state.take() {
117
3.10k
            match state {
118
1.58k
                State::Header(output) => {
119
1.58k
                    if self.tmp.position() > 0 {
120
857
                        Err(Error::LzmaError("failed to read header".to_string()))
121
                    } else {
122
732
                        Ok(output)
123
                    }
124
                }
125
1.51k
                State::Data(mut state) => {
126
1.51k
                    if !self.options.allow_incomplete {
127
                        // Process one last time with empty input to force end of
128
                        // stream checks
129
1.51k
                        let mut stream =
130
1.51k
                            Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
131
1.51k
                        let mut range_decoder =
132
1.51k
                            RangeDecoder::from_parts(&mut stream, state.range, state.code);
133
1.51k
                        state.decoder.process(&mut range_decoder)?;
134
0
                    }
135
344
                    let output = state.decoder.output.finish()?;
136
344
                    Ok(output)
137
                }
138
            }
139
        } else {
140
            // this will occur if a call to `write()` fails
141
0
            Err(Error::LzmaError(
142
0
                "can't finish stream because of previous write error".to_string(),
143
0
            ))
144
        }
145
3.10k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::finish
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::finish
146
147
    /// Attempts to read the header and transition into a running state.
148
    ///
149
    /// This function will consume the state, returning the next state on both
150
    /// error and success.
151
107k
    fn read_header<R: BufRead>(
152
107k
        output: W,
153
107k
        mut input: &mut R,
154
107k
        options: &Options,
155
107k
    ) -> crate::error::Result<State<W>> {
156
107k
        match LzmaParams::read_header(&mut input, options) {
157
61.0k
            Ok(params) => {
158
61.0k
                let decoder = if let Some(memlimit) = options.memlimit {
159
61.0k
                    new_circular_with_memlimit(output, params, memlimit)
160
                } else {
161
0
                    new_circular(output, params)
162
0
                }?;
163
164
                // The RangeDecoder is only kept temporarily as we are processing
165
                // chunks of data.
166
61.0k
                if let Ok(rangecoder) = RangeDecoder::new(&mut input) {
167
39.3k
                    Ok(State::Data(RunState {
168
39.3k
                        decoder,
169
39.3k
                        range: rangecoder.range,
170
39.3k
                        code: rangecoder.code,
171
39.3k
                    }))
172
                } else {
173
                    // Failed to create a RangeDecoder because we need more data,
174
                    // try again later.
175
21.7k
                    Ok(State::Header(decoder.output.into_output()))
176
                }
177
            }
178
            // Failed to read_header() because we need more data, try again later.
179
34.3k
            Err(Error::HeaderTooShort(_)) => Ok(State::Header(output)),
180
            // Fatal error. Don't retry.
181
12.2k
            Err(e) => Err(e),
182
        }
183
107k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_header::<std::io::cursor::Cursor<&[u8]>>
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::read_header::<std::io::cursor::Cursor<&[u8]>>
Line
Count
Source
151
107k
    fn read_header<R: BufRead>(
152
107k
        output: W,
153
107k
        mut input: &mut R,
154
107k
        options: &Options,
155
107k
    ) -> crate::error::Result<State<W>> {
156
107k
        match LzmaParams::read_header(&mut input, options) {
157
61.0k
            Ok(params) => {
158
61.0k
                let decoder = if let Some(memlimit) = options.memlimit {
159
61.0k
                    new_circular_with_memlimit(output, params, memlimit)
160
                } else {
161
0
                    new_circular(output, params)
162
0
                }?;
163
164
                // The RangeDecoder is only kept temporarily as we are processing
165
                // chunks of data.
166
61.0k
                if let Ok(rangecoder) = RangeDecoder::new(&mut input) {
167
39.3k
                    Ok(State::Data(RunState {
168
39.3k
                        decoder,
169
39.3k
                        range: rangecoder.range,
170
39.3k
                        code: rangecoder.code,
171
39.3k
                    }))
172
                } else {
173
                    // Failed to create a RangeDecoder because we need more data,
174
                    // try again later.
175
21.7k
                    Ok(State::Header(decoder.output.into_output()))
176
                }
177
            }
178
            // Failed to read_header() because we need more data, try again later.
179
34.3k
            Err(Error::HeaderTooShort(_)) => Ok(State::Header(output)),
180
            // Fatal error. Don't retry.
181
12.2k
            Err(e) => Err(e),
182
        }
183
107k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::read_header::<_>
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_header::<std::io::cursor::Cursor<&[u8]>>
184
185
    /// Process compressed data
186
84.0k
    fn read_data<R: BufRead>(mut state: RunState<W>, mut input: &mut R) -> io::Result<RunState<W>> {
187
        // Construct our RangeDecoder from the previous range and code
188
        // values.
189
84.0k
        let mut rangecoder = RangeDecoder::from_parts(&mut input, state.range, state.code);
190
191
        // Try to process all bytes of data.
192
84.0k
        state
193
84.0k
            .decoder
194
84.0k
            .process_stream(&mut rangecoder)
195
84.0k
            .map_err(|e| -> io::Error { e.into() })?;
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_data::<std::io::cursor::Cursor<&[u8]>>::{closure#0}
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::read_data::<std::io::cursor::Cursor<&[u8]>>::{closure#0}
Line
Count
Source
195
35.0k
            .map_err(|e| -> io::Error { e.into() })?;
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::read_data::<_>::{closure#0}
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_data::<std::io::cursor::Cursor<&[u8]>>::{closure#0}
196
197
48.9k
        Ok(RunState {
198
48.9k
            decoder: state.decoder,
199
48.9k
            range: rangecoder.range,
200
48.9k
            code: rangecoder.code,
201
48.9k
        })
202
84.0k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_data::<std::io::cursor::Cursor<&[u8]>>
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor>>::read_data::<std::io::cursor::Cursor<&[u8]>>
Line
Count
Source
186
84.0k
    fn read_data<R: BufRead>(mut state: RunState<W>, mut input: &mut R) -> io::Result<RunState<W>> {
187
        // Construct our RangeDecoder from the previous range and code
188
        // values.
189
84.0k
        let mut rangecoder = RangeDecoder::from_parts(&mut input, state.range, state.code);
190
191
        // Try to process all bytes of data.
192
84.0k
        state
193
84.0k
            .decoder
194
84.0k
            .process_stream(&mut rangecoder)
195
84.0k
            .map_err(|e| -> io::Error { e.into() })?;
196
197
48.9k
        Ok(RunState {
198
48.9k
            decoder: state.decoder,
199
48.9k
            range: rangecoder.range,
200
48.9k
            code: rangecoder.code,
201
48.9k
        })
202
84.0k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_>>::read_data::<_>
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>>>::read_data::<std::io::cursor::Cursor<&[u8]>>
203
}
204
205
impl<W> Debug for Stream<W>
206
where
207
    W: Write + Debug,
208
{
209
0
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
210
0
        fmt.debug_struct("Stream")
211
0
            .field("tmp", &self.tmp.position())
212
0
            .field("state", &self.state)
213
0
            .field("options", &self.options)
214
0
            .finish()
215
0
    }
216
}
217
218
impl<W> Write for Stream<W>
219
where
220
    W: Write,
221
{
222
191k
    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
223
191k
        let mut input = Cursor::new(data);
224
225
191k
        if let Some(state) = self.state.take() {
226
191k
            let state = match state {
227
                // Read the header values and transition into a running state.
228
107k
                State::Header(state) => {
229
107k
                    let res = if self.tmp.position() > 0 {
230
                        // attempt to fill the tmp buffer
231
50.6k
                        let position = self.tmp.position();
232
50.6k
                        let bytes_read =
233
50.6k
                            input.read(&mut self.tmp.get_mut()[position as usize..])?;
234
50.6k
                        let bytes_read = if bytes_read < std::u64::MAX as usize {
235
50.6k
                            bytes_read as u64
236
                        } else {
237
0
                            return Err(io::Error::new(
238
0
                                io::ErrorKind::Other,
239
0
                                "Failed to convert integer to u64.",
240
0
                            ));
241
                        };
242
50.6k
                        self.tmp.set_position(position + bytes_read);
243
244
                        // attempt to read the header from our tmp buffer
245
50.6k
                        let (position, res) = {
246
50.6k
                            let mut tmp_input =
247
50.6k
                                Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
248
50.6k
                            let res = Stream::read_header(state, &mut tmp_input, &self.options);
249
50.6k
                            (tmp_input.position(), res)
250
50.6k
                        };
251
252
                        // discard all bytes up to position if reading the header
253
                        // was successful
254
50.6k
                        if let Ok(State::Data(_)) = &res {
255
20.4k
                            let tmp = *self.tmp.get_ref();
256
20.4k
                            let end = self.tmp.position();
257
20.4k
                            let new_len = end - position;
258
20.4k
                            (&mut self.tmp.get_mut()[0..new_len as usize])
259
20.4k
                                .copy_from_slice(&tmp[position as usize..end as usize]);
260
20.4k
                            self.tmp.set_position(new_len);
261
30.1k
                        }
262
50.6k
                        res
263
                    } else {
264
57.0k
                        Stream::read_header(state, &mut input, &self.options)
265
                    };
266
267
95.4k
                    match res {
268
                        // occurs when not enough input bytes were provided to
269
                        // read the entire header
270
56.1k
                        Ok(State::Header(val)) => {
271
56.1k
                            if self.tmp.position() == 0 {
272
                                // reset the cursor because we may have partial reads
273
25.9k
                                input.set_position(0);
274
25.9k
                                let bytes_read = input.read(&mut self.tmp.get_mut()[..])?;
275
25.9k
                                let bytes_read = if bytes_read < std::u64::MAX as usize {
276
25.9k
                                    bytes_read as u64
277
                                } else {
278
0
                                    return Err(io::Error::new(
279
0
                                        io::ErrorKind::Other,
280
0
                                        "Failed to convert integer to u64.",
281
0
                                    ));
282
                                };
283
25.9k
                                self.tmp.set_position(bytes_read);
284
30.1k
                            }
285
56.1k
                            State::Header(val)
286
                        }
287
288
                        // occurs when the header was successfully read and we
289
                        // move on to the next state
290
39.3k
                        Ok(State::Data(val)) => State::Data(val),
291
292
                        // occurs when the output was consumed due to a
293
                        // non-recoverable error
294
12.2k
                        Err(e) => {
295
12.2k
                            return Err(match e {
296
0
                                Error::IoError(e) | Error::HeaderTooShort(e) => e,
297
12.2k
                                Error::LzmaError(e) | Error::XzError(e) => {
298
12.2k
                                    io::Error::new(io::ErrorKind::Other, e)
299
                                }
300
                            });
301
                        }
302
                    }
303
                }
304
305
                // Process another chunk of data.
306
84.0k
                State::Data(state) => {
307
84.0k
                    let state = if self.tmp.position() > 0 {
308
0
                        let mut tmp_input =
309
0
                            Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
310
0
                        let res = Stream::read_data(state, &mut tmp_input)?;
311
0
                        self.tmp.set_position(0);
312
0
                        res
313
                    } else {
314
84.0k
                        state
315
                    };
316
84.0k
                    State::Data(Stream::read_data(state, &mut input)?)
317
                }
318
            };
319
144k
            self.state.replace(state);
320
0
        }
321
144k
        Ok(input.position() as usize)
322
191k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>> as std::io::Write>::write
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor> as std::io::Write>::write
Line
Count
Source
222
191k
    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
223
191k
        let mut input = Cursor::new(data);
224
225
191k
        if let Some(state) = self.state.take() {
226
191k
            let state = match state {
227
                // Read the header values and transition into a running state.
228
107k
                State::Header(state) => {
229
107k
                    let res = if self.tmp.position() > 0 {
230
                        // attempt to fill the tmp buffer
231
50.6k
                        let position = self.tmp.position();
232
50.6k
                        let bytes_read =
233
50.6k
                            input.read(&mut self.tmp.get_mut()[position as usize..])?;
234
50.6k
                        let bytes_read = if bytes_read < std::u64::MAX as usize {
235
50.6k
                            bytes_read as u64
236
                        } else {
237
0
                            return Err(io::Error::new(
238
0
                                io::ErrorKind::Other,
239
0
                                "Failed to convert integer to u64.",
240
0
                            ));
241
                        };
242
50.6k
                        self.tmp.set_position(position + bytes_read);
243
244
                        // attempt to read the header from our tmp buffer
245
50.6k
                        let (position, res) = {
246
50.6k
                            let mut tmp_input =
247
50.6k
                                Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
248
50.6k
                            let res = Stream::read_header(state, &mut tmp_input, &self.options);
249
50.6k
                            (tmp_input.position(), res)
250
50.6k
                        };
251
252
                        // discard all bytes up to position if reading the header
253
                        // was successful
254
50.6k
                        if let Ok(State::Data(_)) = &res {
255
20.4k
                            let tmp = *self.tmp.get_ref();
256
20.4k
                            let end = self.tmp.position();
257
20.4k
                            let new_len = end - position;
258
20.4k
                            (&mut self.tmp.get_mut()[0..new_len as usize])
259
20.4k
                                .copy_from_slice(&tmp[position as usize..end as usize]);
260
20.4k
                            self.tmp.set_position(new_len);
261
30.1k
                        }
262
50.6k
                        res
263
                    } else {
264
57.0k
                        Stream::read_header(state, &mut input, &self.options)
265
                    };
266
267
95.4k
                    match res {
268
                        // occurs when not enough input bytes were provided to
269
                        // read the entire header
270
56.1k
                        Ok(State::Header(val)) => {
271
56.1k
                            if self.tmp.position() == 0 {
272
                                // reset the cursor because we may have partial reads
273
25.9k
                                input.set_position(0);
274
25.9k
                                let bytes_read = input.read(&mut self.tmp.get_mut()[..])?;
275
25.9k
                                let bytes_read = if bytes_read < std::u64::MAX as usize {
276
25.9k
                                    bytes_read as u64
277
                                } else {
278
0
                                    return Err(io::Error::new(
279
0
                                        io::ErrorKind::Other,
280
0
                                        "Failed to convert integer to u64.",
281
0
                                    ));
282
                                };
283
25.9k
                                self.tmp.set_position(bytes_read);
284
30.1k
                            }
285
56.1k
                            State::Header(val)
286
                        }
287
288
                        // occurs when the header was successfully read and we
289
                        // move on to the next state
290
39.3k
                        Ok(State::Data(val)) => State::Data(val),
291
292
                        // occurs when the output was consumed due to a
293
                        // non-recoverable error
294
12.2k
                        Err(e) => {
295
12.2k
                            return Err(match e {
296
0
                                Error::IoError(e) | Error::HeaderTooShort(e) => e,
297
12.2k
                                Error::LzmaError(e) | Error::XzError(e) => {
298
12.2k
                                    io::Error::new(io::ErrorKind::Other, e)
299
                                }
300
                            });
301
                        }
302
                    }
303
                }
304
305
                // Process another chunk of data.
306
84.0k
                State::Data(state) => {
307
84.0k
                    let state = if self.tmp.position() > 0 {
308
0
                        let mut tmp_input =
309
0
                            Cursor::new(&self.tmp.get_ref()[0..self.tmp.position() as usize]);
310
0
                        let res = Stream::read_data(state, &mut tmp_input)?;
311
0
                        self.tmp.set_position(0);
312
0
                        res
313
                    } else {
314
84.0k
                        state
315
                    };
316
84.0k
                    State::Data(Stream::read_data(state, &mut input)?)
317
                }
318
            };
319
144k
            self.state.replace(state);
320
0
        }
321
144k
        Ok(input.position() as usize)
322
191k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_> as std::io::Write>::write
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<std::io::cursor::Cursor<&mut [u8]>> as std::io::Write>::write
323
324
    /// Flushes the output sink. The internal buffer isn't flushed to avoid
325
    /// corrupting the internal state. Instead, call `finish()` to finalize the
326
    /// stream and flush all remaining internal data.
327
184k
    fn flush(&mut self) -> io::Result<()> {
328
184k
        if let Some(ref mut state) = self.state {
329
184k
            match state {
330
129k
                State::Header(_) => Ok(()),
331
54.9k
                State::Data(state) => state.decoder.output.get_output_mut().flush(),
332
            }
333
        } else {
334
0
            Ok(())
335
        }
336
184k
    }
Unexecuted instantiation: <lzma_rs::decode::stream::Stream<_> as std::io::Write>::flush
<lzma_rs::decode::stream::Stream<suricata_htp::decompressors::BlockingCursor> as std::io::Write>::flush
Line
Count
Source
327
184k
    fn flush(&mut self) -> io::Result<()> {
328
184k
        if let Some(ref mut state) = self.state {
329
184k
            match state {
330
129k
                State::Header(_) => Ok(()),
331
54.9k
                State::Data(state) => state.decoder.output.get_output_mut().flush(),
332
            }
333
        } else {
334
0
            Ok(())
335
        }
336
184k
    }
337
}
338
339
impl From<Error> for io::Error {
340
35.0k
    fn from(error: Error) -> io::Error {
341
35.0k
        io::Error::new(io::ErrorKind::Other, format!("{:?}", error))
342
35.0k
    }
343
}
344
345
#[cfg(test)]
346
mod test {
347
    use super::*;
348
349
    /// Test an empty stream
350
    #[test]
351
    fn test_stream_noop() {
352
        let stream = Stream::new(Vec::new());
353
        assert!(stream.get_output().unwrap().is_empty());
354
355
        let output = stream.finish().unwrap();
356
        assert!(output.is_empty());
357
    }
358
359
    /// Test writing an empty slice
360
    #[test]
361
    fn test_stream_zero() {
362
        let mut stream = Stream::new(Vec::new());
363
364
        stream.write_all(&[]).unwrap();
365
        stream.write_all(&[]).unwrap();
366
367
        let output = stream.finish().unwrap();
368
369
        assert!(output.is_empty());
370
    }
371
372
    /// Test a bad header value
373
    #[test]
374
    #[should_panic(expected = "LZMA header invalid properties: 255 must be < 225")]
375
    fn test_bad_header() {
376
        let input = [255u8; 32];
377
378
        let mut stream = Stream::new(Vec::new());
379
380
        stream.write_all(&input[..]).unwrap();
381
382
        let output = stream.finish().unwrap();
383
384
        assert!(output.is_empty());
385
    }
386
387
    /// Test processing only partial data
388
    #[test]
389
    fn test_stream_incomplete() {
390
        let input = b"\x5d\x00\x00\x80\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00\x83\xff\
391
                      \xfb\xff\xff\xc0\x00\x00\x00";
392
        // Process until this index is reached.
393
        let mut end = 1u64;
394
395
        // Test when we fail to provide the minimum number of bytes required to
396
        // read the header. Header size is 13 bytes but we also read the first 5
397
        // bytes of data.
398
        while end < (MAX_HEADER_LEN + START_BYTES) as u64 {
399
            let mut stream = Stream::new(Vec::new());
400
            stream.write_all(&input[..end as usize]).unwrap();
401
            assert_eq!(stream.tmp.position(), end);
402
403
            let err = stream.finish().unwrap_err();
404
            assert!(
405
                err.to_string().contains("failed to read header"),
406
                "error was: {}",
407
                err
408
            );
409
410
            end += 1;
411
        }
412
413
        // Test when we fail to provide enough bytes to terminate the stream. A
414
        // properly terminated stream will have a code value of 0.
415
        while end < input.len() as u64 {
416
            let mut stream = Stream::new(Vec::new());
417
            stream.write_all(&input[..end as usize]).unwrap();
418
419
            // Header bytes will be buffered until there are enough to read
420
            if end < (MAX_HEADER_LEN + START_BYTES) as u64 {
421
                assert_eq!(stream.tmp.position(), end);
422
            }
423
424
            let err = stream.finish().unwrap_err();
425
            assert!(err.to_string().contains("failed to fill whole buffer"));
426
427
            end += 1;
428
        }
429
    }
430
431
    /// Test processing all chunk sizes
432
    #[test]
433
    fn test_stream_chunked() {
434
        let small_input = include_bytes!("../../tests/files/small.txt");
435
436
        let mut reader = io::Cursor::new(&small_input[..]);
437
        let mut small_input_compressed = Vec::new();
438
        crate::lzma_compress(&mut reader, &mut small_input_compressed).unwrap();
439
440
        let input : Vec<(&[u8], &[u8])> = vec![
441
            (b"\x5d\x00\x00\x80\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00\x83\xff\xfb\xff\xff\xc0\x00\x00\x00", b""),
442
            (&small_input_compressed[..], small_input)];
443
        for (input, expected) in input {
444
            for chunk in 1..input.len() {
445
                let mut consumed = 0;
446
                let mut stream = Stream::new(Vec::new());
447
                while consumed < input.len() {
448
                    let end = std::cmp::min(consumed + chunk, input.len());
449
                    stream.write_all(&input[consumed..end]).unwrap();
450
                    consumed = end;
451
                }
452
                let output = stream.finish().unwrap();
453
                assert_eq!(expected, &output[..]);
454
            }
455
        }
456
    }
457
458
    #[test]
459
    fn test_stream_corrupted() {
460
        let mut stream = Stream::new(Vec::new());
461
        let err = stream
462
            .write_all(b"corrupted bytes here corrupted bytes here")
463
            .unwrap_err();
464
        assert!(err.to_string().contains("beyond output size"));
465
        let err = stream.finish().unwrap_err();
466
        assert!(err
467
            .to_string()
468
            .contains("can\'t finish stream because of previous write error"));
469
    }
470
471
    #[test]
472
    fn test_allow_incomplete() {
473
        let input = include_bytes!("../../tests/files/small.txt");
474
475
        let mut reader = io::Cursor::new(&input[..]);
476
        let mut compressed = Vec::new();
477
        crate::lzma_compress(&mut reader, &mut compressed).unwrap();
478
        let compressed = &compressed[..compressed.len() / 2];
479
480
        // Should fail to finish() without the allow_incomplete option.
481
        let mut stream = Stream::new(Vec::new());
482
        stream.write_all(&compressed[..]).unwrap();
483
        stream.finish().unwrap_err();
484
485
        // Should succeed with the allow_incomplete option.
486
        let mut stream = Stream::new_with_options(
487
            &Options {
488
                allow_incomplete: true,
489
                ..Default::default()
490
            },
491
            Vec::new(),
492
        );
493
        stream.write_all(&compressed[..]).unwrap();
494
        let output = stream.finish().unwrap();
495
        assert_eq!(output, &input[..26]);
496
    }
497
}