Coverage Report

Created: 2025-10-28 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/codec/framed_read.rs
Line
Count
Source
1
use crate::frame::{self, Frame, Kind, Reason};
2
use crate::frame::{
3
    DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE,
4
};
5
use crate::proto::Error;
6
7
use crate::hpack;
8
9
use futures_core::Stream;
10
11
use bytes::{Buf, BytesMut};
12
13
use std::io;
14
15
use std::pin::Pin;
16
use std::task::{Context, Poll};
17
use tokio::io::AsyncRead;
18
use tokio_util::codec::FramedRead as InnerFramedRead;
19
use tokio_util::codec::{LengthDelimitedCodec, LengthDelimitedCodecError};
20
21
// 16 MB "sane default" taken from golang http2
22
const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
23
24
#[derive(Debug)]
25
pub struct FramedRead<T> {
26
    inner: InnerFramedRead<T, LengthDelimitedCodec>,
27
28
    // hpack decoder state
29
    hpack: hpack::Decoder,
30
31
    max_header_list_size: usize,
32
33
    max_continuation_frames: usize,
34
35
    partial: Option<Partial>,
36
}
37
38
/// Partially loaded headers frame
39
#[derive(Debug)]
40
struct Partial {
41
    /// Empty frame
42
    frame: Continuable,
43
44
    /// Partial header payload
45
    buf: BytesMut,
46
47
    continuation_frames_count: usize,
48
}
49
50
#[derive(Debug)]
51
enum Continuable {
52
    Headers(frame::Headers),
53
    PushPromise(frame::PushPromise),
54
}
55
56
impl<T> FramedRead<T> {
57
14.0k
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
14.0k
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
14.0k
        let max_continuation_frames =
60
14.0k
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
14.0k
        FramedRead {
62
14.0k
            inner,
63
14.0k
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
14.0k
            max_header_list_size,
65
14.0k
            max_continuation_frames,
66
14.0k
            partial: None,
67
14.0k
        }
68
14.0k
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::new
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::new
Line
Count
Source
57
634
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
634
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
634
        let max_continuation_frames =
60
634
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
634
        FramedRead {
62
634
            inner,
63
634
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
634
            max_header_list_size,
65
634
            max_continuation_frames,
66
634
            partial: None,
67
634
        }
68
634
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::new
Line
Count
Source
57
634
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
634
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
634
        let max_continuation_frames =
60
634
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
634
        FramedRead {
62
634
            inner,
63
634
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
634
            max_header_list_size,
65
634
            max_continuation_frames,
66
634
            partial: None,
67
634
        }
68
634
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::new
Line
Count
Source
57
12.7k
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
12.7k
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
12.7k
        let max_continuation_frames =
60
12.7k
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
12.7k
        FramedRead {
62
12.7k
            inner,
63
12.7k
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
12.7k
            max_header_list_size,
65
12.7k
            max_continuation_frames,
66
12.7k
            partial: None,
67
12.7k
        }
68
12.7k
    }
69
70
1.06M
    pub fn get_ref(&self) -> &T {
71
1.06M
        self.inner.get_ref()
72
1.06M
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::get_ref
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::get_ref
Line
Count
Source
70
1.06M
    pub fn get_ref(&self) -> &T {
71
1.06M
        self.inner.get_ref()
72
1.06M
    }
73
74
6.40M
    pub fn get_mut(&mut self) -> &mut T {
75
6.40M
        self.inner.get_mut()
76
6.40M
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::get_mut
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::get_mut
Line
Count
Source
74
634
    pub fn get_mut(&mut self) -> &mut T {
75
634
        self.inner.get_mut()
76
634
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::get_mut
Line
Count
Source
74
1.26k
    pub fn get_mut(&mut self) -> &mut T {
75
1.26k
        self.inner.get_mut()
76
1.26k
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::get_mut
Line
Count
Source
74
6.40M
    pub fn get_mut(&mut self) -> &mut T {
75
6.40M
        self.inner.get_mut()
76
6.40M
    }
77
78
    /// Returns the current max frame size setting
79
    #[inline]
80
0
    pub fn max_frame_size(&self) -> usize {
81
0
        self.inner.decoder().max_frame_length()
82
0
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::max_frame_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::max_frame_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::max_frame_size
83
84
    /// Updates the max frame size setting.
85
    ///
86
    /// Must be within 16,384 and 16,777,215.
87
    #[inline]
88
14.0k
    pub fn set_max_frame_size(&mut self, val: usize) {
89
14.0k
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
14.0k
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
14.0k
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
14.0k
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::set_max_frame_size
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_frame_size
Line
Count
Source
88
634
    pub fn set_max_frame_size(&mut self, val: usize) {
89
634
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
634
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
634
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
634
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::set_max_frame_size
Line
Count
Source
88
634
    pub fn set_max_frame_size(&mut self, val: usize) {
89
634
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
634
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
634
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
634
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_frame_size
Line
Count
Source
88
12.7k
    pub fn set_max_frame_size(&mut self, val: usize) {
89
12.7k
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
12.7k
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
12.7k
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
12.7k
    }
94
95
    /// Update the max header list size setting.
96
    #[inline]
97
0
    pub fn set_max_header_list_size(&mut self, val: usize) {
98
0
        self.max_header_list_size = val;
99
        // Update max CONTINUATION frames too, since its based on this
100
0
        self.max_continuation_frames = calc_max_continuation_frames(val, self.max_frame_size());
101
0
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::set_max_header_list_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_header_list_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_header_list_size
102
103
    /// Update the header table size setting.
104
    #[inline]
105
0
    pub fn set_header_table_size(&mut self, val: usize) {
106
0
        self.hpack.queue_size_update(val);
107
0
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_>>::set_header_table_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_header_table_size
108
}
109
110
28.1k
fn calc_max_continuation_frames(header_max: usize, frame_max: usize) -> usize {
111
    // At least this many frames needed to use max header list size
112
28.1k
    let min_frames_for_list = (header_max / frame_max).max(1);
113
    // Some padding for imperfectly packed frames
114
    // 25% without floats
115
28.1k
    let padding = min_frames_for_list >> 2;
116
28.1k
    min_frames_for_list.saturating_add(padding).max(5)
117
28.1k
}
118
119
/// Decodes a frame.
120
///
121
/// This method is intentionally de-generified and outlined because it is very large.
122
132k
fn decode_frame(
123
132k
    hpack: &mut hpack::Decoder,
124
132k
    max_header_list_size: usize,
125
132k
    max_continuation_frames: usize,
126
132k
    partial_inout: &mut Option<Partial>,
127
132k
    mut bytes: BytesMut,
128
132k
) -> Result<Option<Frame>, Error> {
129
132k
    let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
130
132k
    let _e = span.enter();
131
132
132k
    tracing::trace!("decoding frame from {}B", bytes.len());
133
134
    // Parse the head
135
132k
    let head = frame::Head::parse(&bytes);
136
137
132k
    if partial_inout.is_some() && head.kind() != Kind::Continuation {
138
14
        proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind());
139
14
        return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140
132k
    }
141
142
132k
    let kind = head.kind();
143
144
132k
    tracing::trace!(frame.kind = ?kind);
145
146
    macro_rules! header_block {
147
        ($frame:ident, $head:ident, $bytes:ident) => ({
148
            // Drop the frame header
149
            $bytes.advance(frame::HEADER_LEN);
150
151
            // Parse the header frame w/o parsing the payload
152
            let (mut frame, mut payload) = match frame::$frame::load($head, $bytes) {
153
                Ok(res) => res,
154
                Err(frame::Error::InvalidDependencyId) => {
155
                    proto_err!(stream: "invalid HEADERS dependency ID");
156
                    // A stream cannot depend on itself. An endpoint MUST
157
                    // treat this as a stream error (Section 5.4.2) of type
158
                    // `PROTOCOL_ERROR`.
159
                    return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR));
160
                },
161
                Err(e) => {
162
                    proto_err!(conn: "failed to load frame; err={:?}", e);
163
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
164
                }
165
            };
166
167
            let is_end_headers = frame.is_end_headers();
168
169
            // Load the HPACK encoded headers
170
            match frame.load_hpack(&mut payload, max_header_list_size, hpack) {
171
                Ok(_) => {},
172
                Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {},
173
                Err(frame::Error::MalformedMessage) => {
174
                    let id = $head.stream_id();
175
                    proto_err!(stream: "malformed header block; stream={:?}", id);
176
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
177
                },
178
                Err(e) => {
179
                    proto_err!(conn: "failed HPACK decoding; err={:?}", e);
180
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
181
                }
182
            }
183
184
            if is_end_headers {
185
                frame.into()
186
            } else {
187
                tracing::trace!("loaded partial header block");
188
                // Defer returning the frame
189
                *partial_inout = Some(Partial {
190
                    frame: Continuable::$frame(frame),
191
                    buf: payload,
192
                    continuation_frames_count: 0,
193
                });
194
195
                return Ok(None);
196
            }
197
        });
198
    }
199
200
132k
    let frame = match kind {
201
        Kind::Settings => {
202
7.47k
            let res = frame::Settings::load(head, &bytes[frame::HEADER_LEN..]);
203
204
7.47k
            res.map_err(|e| {
205
161
                proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
206
161
                Error::library_go_away(Reason::PROTOCOL_ERROR)
207
161
            })?
208
7.31k
            .into()
209
        }
210
        Kind::Ping => {
211
380
            let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]);
212
213
380
            res.map_err(|e| {
214
61
                proto_err!(conn: "failed to load PING frame; err={:?}", e);
215
61
                Error::library_go_away(Reason::PROTOCOL_ERROR)
216
61
            })?
217
319
            .into()
218
        }
219
        Kind::WindowUpdate => {
220
1.49k
            let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]);
221
222
1.49k
            res.map_err(|e| {
223
11
                proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
224
11
                Error::library_go_away(Reason::PROTOCOL_ERROR)
225
11
            })?
226
1.48k
            .into()
227
        }
228
        Kind::Data => {
229
49.4k
            bytes.advance(frame::HEADER_LEN);
230
49.4k
            let res = frame::Data::load(head, bytes.freeze());
231
232
            // TODO: Should this always be connection level? Probably not...
233
49.4k
            res.map_err(|e| {
234
50
                proto_err!(conn: "failed to load DATA frame; err={:?}", e);
235
50
                Error::library_go_away(Reason::PROTOCOL_ERROR)
236
50
            })?
237
49.3k
            .into()
238
        }
239
45.2k
        Kind::Headers => header_block!(Headers, head, bytes),
240
        Kind::Reset => {
241
1.30k
            let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
242
1.30k
            res.map_err(|e| {
243
16
                proto_err!(conn: "failed to load RESET frame; err={:?}", e);
244
16
                Error::library_go_away(Reason::PROTOCOL_ERROR)
245
16
            })?
246
1.28k
            .into()
247
        }
248
        Kind::GoAway => {
249
4.23k
            let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
250
4.23k
            res.map_err(|e| {
251
5
                proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
252
5
                Error::library_go_away(Reason::PROTOCOL_ERROR)
253
5
            })?
254
4.22k
            .into()
255
        }
256
2.65k
        Kind::PushPromise => header_block!(PushPromise, head, bytes),
257
        Kind::Priority => {
258
238
            if head.stream_id() == 0 {
259
                // Invalid stream identifier
260
2
                proto_err!(conn: "invalid stream ID 0");
261
2
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
262
236
            }
263
264
236
            match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
265
97
                Ok(frame) => frame.into(),
266
                Err(frame::Error::InvalidDependencyId) => {
267
                    // A stream cannot depend on itself. An endpoint MUST
268
                    // treat this as a stream error (Section 5.4.2) of type
269
                    // `PROTOCOL_ERROR`.
270
96
                    let id = head.stream_id();
271
96
                    proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
272
96
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
273
                }
274
43
                Err(e) => {
275
43
                    proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
276
43
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
277
                }
278
            }
279
        }
280
        Kind::Continuation => {
281
11.7k
            let is_end_headers = (head.flag() & 0x4) == 0x4;
282
283
11.7k
            let mut partial = match partial_inout.take() {
284
11.7k
                Some(partial) => partial,
285
                None => {
286
1
                    proto_err!(conn: "received unexpected CONTINUATION frame");
287
1
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
288
                }
289
            };
290
291
            // The stream identifiers must match
292
11.7k
            if partial.frame.stream_id() != head.stream_id() {
293
51
                proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
294
51
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
295
11.7k
            }
296
297
            // Check for CONTINUATION flood
298
11.7k
            if is_end_headers {
299
207
                partial.continuation_frames_count = 0;
300
207
            } else {
301
11.5k
                let cnt = partial.continuation_frames_count + 1;
302
11.5k
                if cnt > max_continuation_frames {
303
0
                    tracing::debug!("too_many_continuations, max = {}", max_continuation_frames);
304
0
                    return Err(Error::library_go_away_data(
305
0
                        Reason::ENHANCE_YOUR_CALM,
306
0
                        "too_many_continuations",
307
0
                    ));
308
11.5k
                } else {
309
11.5k
                    partial.continuation_frames_count = cnt;
310
11.5k
                }
311
            }
312
313
            // Extend the buf
314
11.7k
            if partial.buf.is_empty() {
315
1.99k
                partial.buf = bytes.split_off(frame::HEADER_LEN);
316
1.99k
            } else {
317
9.72k
                if partial.frame.is_over_size() {
318
                    // If there was left over bytes previously, they may be
319
                    // needed to continue decoding, even though we will
320
                    // be ignoring this frame. This is done to keep the HPACK
321
                    // decoder state up-to-date.
322
                    //
323
                    // Still, we need to be careful, because if a malicious
324
                    // attacker were to try to send a gigantic string, such
325
                    // that it fits over multiple header blocks, we could
326
                    // grow memory uncontrollably again, and that'd be a shame.
327
                    //
328
                    // Instead, we use a simple heuristic to determine if
329
                    // we should continue to ignore decoding, or to tell
330
                    // the attacker to go away.
331
0
                    if partial.buf.len() + bytes.len() > max_header_list_size {
332
0
                        proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
333
0
                        return Err(Error::library_go_away(Reason::COMPRESSION_ERROR));
334
0
                    }
335
9.72k
                }
336
9.72k
                partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
337
            }
338
339
11.7k
            match partial
340
11.7k
                .frame
341
11.7k
                .load_hpack(&mut partial.buf, max_header_list_size, hpack)
342
            {
343
1.16k
                Ok(_) => {}
344
10.2k
                Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {}
345
                Err(frame::Error::MalformedMessage) => {
346
207
                    let id = head.stream_id();
347
207
                    proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
348
207
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
349
                }
350
57
                Err(e) => {
351
57
                    proto_err!(conn: "failed HPACK decoding; err={:?}", e);
352
57
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
353
                }
354
            }
355
356
11.4k
            if is_end_headers {
357
146
                partial.frame.into()
358
            } else {
359
11.3k
                *partial_inout = Some(partial);
360
11.3k
                return Ok(None);
361
            }
362
        }
363
        Kind::Unknown => {
364
            // Unknown frames are ignored
365
8.33k
            return Ok(None);
366
        }
367
    };
368
369
73.4k
    Ok(Some(frame))
370
132k
}
371
372
impl<T> Stream for FramedRead<T>
373
where
374
    T: AsyncRead + Unpin,
375
{
376
    type Item = Result<Frame, Error>;
377
378
1.59M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379
1.59M
        let span = tracing::trace_span!("FramedRead::poll_next");
380
1.59M
        let _e = span.enter();
381
        loop {
382
1.61M
            tracing::trace!("poll");
383
1.61M
            let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
384
132k
                Some(Ok(bytes)) => bytes,
385
1.52k
                Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
386
4.72k
                None => return Poll::Ready(None),
387
            };
388
389
132k
            tracing::trace!(read.bytes = bytes.len());
390
            let Self {
391
132k
                ref mut hpack,
392
132k
                max_header_list_size,
393
132k
                ref mut partial,
394
132k
                max_continuation_frames,
395
                ..
396
132k
            } = *self;
397
132k
            if let Some(frame) = decode_frame(
398
132k
                hpack,
399
132k
                max_header_list_size,
400
132k
                max_continuation_frames,
401
132k
                partial,
402
132k
                bytes,
403
36.7k
            )? {
404
73.4k
                tracing::debug!(?frame, "received");
405
73.4k
                return Poll::Ready(Some(Ok(frame)));
406
22.2k
            }
407
        }
408
1.59M
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<_> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>> as futures_core::stream::Stream>::poll_next
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>> as futures_core::stream::Stream>::poll_next
Line
Count
Source
378
1.59M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379
1.59M
        let span = tracing::trace_span!("FramedRead::poll_next");
380
1.59M
        let _e = span.enter();
381
        loop {
382
1.61M
            tracing::trace!("poll");
383
1.61M
            let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
384
132k
                Some(Ok(bytes)) => bytes,
385
1.52k
                Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
386
4.72k
                None => return Poll::Ready(None),
387
            };
388
389
132k
            tracing::trace!(read.bytes = bytes.len());
390
            let Self {
391
132k
                ref mut hpack,
392
132k
                max_header_list_size,
393
132k
                ref mut partial,
394
132k
                max_continuation_frames,
395
                ..
396
132k
            } = *self;
397
132k
            if let Some(frame) = decode_frame(
398
132k
                hpack,
399
132k
                max_header_list_size,
400
132k
                max_continuation_frames,
401
132k
                partial,
402
132k
                bytes,
403
36.7k
            )? {
404
73.4k
                tracing::debug!(?frame, "received");
405
73.4k
                return Poll::Ready(Some(Ok(frame)));
406
22.2k
            }
407
        }
408
1.59M
    }
409
}
410
411
1.52k
fn map_err(err: io::Error) -> Error {
412
1.52k
    if let io::ErrorKind::InvalidData = err.kind() {
413
596
        if let Some(custom) = err.get_ref() {
414
596
            if custom.is::<LengthDelimitedCodecError>() {
415
596
                return Error::library_go_away(Reason::FRAME_SIZE_ERROR);
416
0
            }
417
0
        }
418
924
    }
419
924
    err.into()
420
1.52k
}
421
422
// ===== impl Continuable =====
423
424
impl Continuable {
425
11.7k
    fn stream_id(&self) -> frame::StreamId {
426
11.7k
        match *self {
427
11.7k
            Continuable::Headers(ref h) => h.stream_id(),
428
1
            Continuable::PushPromise(ref p) => p.stream_id(),
429
        }
430
11.7k
    }
431
432
9.72k
    fn is_over_size(&self) -> bool {
433
9.72k
        match *self {
434
9.72k
            Continuable::Headers(ref h) => h.is_over_size(),
435
0
            Continuable::PushPromise(ref p) => p.is_over_size(),
436
        }
437
9.72k
    }
438
439
11.7k
    fn load_hpack(
440
11.7k
        &mut self,
441
11.7k
        src: &mut BytesMut,
442
11.7k
        max_header_list_size: usize,
443
11.7k
        decoder: &mut hpack::Decoder,
444
11.7k
    ) -> Result<(), frame::Error> {
445
11.7k
        match *self {
446
11.7k
            Continuable::Headers(ref mut h) => h.load_hpack(src, max_header_list_size, decoder),
447
0
            Continuable::PushPromise(ref mut p) => p.load_hpack(src, max_header_list_size, decoder),
448
        }
449
11.7k
    }
450
}
451
452
impl<T> From<Continuable> for Frame<T> {
453
146
    fn from(cont: Continuable) -> Self {
454
146
        match cont {
455
146
            Continuable::Headers(mut headers) => {
456
146
                headers.set_end_headers();
457
146
                headers.into()
458
            }
459
0
            Continuable::PushPromise(mut push) => {
460
0
                push.set_end_headers();
461
0
                push.into()
462
            }
463
        }
464
146
    }
465
}