Coverage Report

Created: 2026-02-26 06:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/codec/framed_read.rs
Line
Count
Source
1
use crate::frame::{self, Frame, Kind, Reason};
2
use crate::frame::{
3
    DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE,
4
};
5
use crate::proto::Error;
6
7
use crate::hpack;
8
9
use futures_core::Stream;
10
11
use bytes::{Buf, BytesMut};
12
13
use std::io;
14
15
use std::pin::Pin;
16
use std::task::{Context, Poll};
17
use tokio::io::AsyncRead;
18
use tokio_util::codec::FramedRead as InnerFramedRead;
19
use tokio_util::codec::{LengthDelimitedCodec, LengthDelimitedCodecError};
20
21
// 16 MB "sane default" taken from golang http2
22
const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
23
24
#[derive(Debug)]
25
pub struct FramedRead<T> {
26
    inner: InnerFramedRead<T, LengthDelimitedCodec>,
27
28
    // hpack decoder state
29
    hpack: hpack::Decoder,
30
31
    max_header_list_size: usize,
32
33
    max_continuation_frames: usize,
34
35
    partial: Option<Partial>,
36
}
37
38
/// Partially loaded headers frame
39
#[derive(Debug)]
40
struct Partial {
41
    /// Empty frame
42
    frame: Continuable,
43
44
    /// Partial header payload
45
    buf: BytesMut,
46
47
    continuation_frames_count: usize,
48
}
49
50
#[derive(Debug)]
51
enum Continuable {
52
    Headers(frame::Headers),
53
    PushPromise(frame::PushPromise),
54
}
55
56
impl<T> FramedRead<T> {
57
15.2k
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
15.2k
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
15.2k
        let max_continuation_frames =
60
15.2k
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
15.2k
        FramedRead {
62
15.2k
            inner,
63
15.2k
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
15.2k
            max_header_list_size,
65
15.2k
            max_continuation_frames,
66
15.2k
            partial: None,
67
15.2k
        }
68
15.2k
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::new
Line
Count
Source
57
827
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
827
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
827
        let max_continuation_frames =
60
827
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
827
        FramedRead {
62
827
            inner,
63
827
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
827
            max_header_list_size,
65
827
            max_continuation_frames,
66
827
            partial: None,
67
827
        }
68
827
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::new
Line
Count
Source
57
827
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
827
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
827
        let max_continuation_frames =
60
827
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
827
        FramedRead {
62
827
            inner,
63
827
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
827
            max_header_list_size,
65
827
            max_continuation_frames,
66
827
            partial: None,
67
827
        }
68
827
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::new
Line
Count
Source
57
13.6k
    pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
58
13.6k
        let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
59
13.6k
        let max_continuation_frames =
60
13.6k
            calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
61
13.6k
        FramedRead {
62
13.6k
            inner,
63
13.6k
            hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
64
13.6k
            max_header_list_size,
65
13.6k
            max_continuation_frames,
66
13.6k
            partial: None,
67
13.6k
        }
68
13.6k
    }
69
70
536k
    pub fn get_ref(&self) -> &T {
71
536k
        self.inner.get_ref()
72
536k
    }
73
74
4.34M
    pub fn get_mut(&mut self) -> &mut T {
75
4.34M
        self.inner.get_mut()
76
4.34M
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::get_mut
Line
Count
Source
74
827
    pub fn get_mut(&mut self) -> &mut T {
75
827
        self.inner.get_mut()
76
827
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::get_mut
Line
Count
Source
74
1.65k
    pub fn get_mut(&mut self) -> &mut T {
75
1.65k
        self.inner.get_mut()
76
1.65k
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::get_mut
Line
Count
Source
74
4.34M
    pub fn get_mut(&mut self) -> &mut T {
75
4.34M
        self.inner.get_mut()
76
4.34M
    }
77
78
    /// Returns the current max frame size setting
79
    #[inline]
80
0
    pub fn max_frame_size(&self) -> usize {
81
0
        self.inner.decoder().max_frame_length()
82
0
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::max_frame_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::max_frame_size
83
84
    /// Updates the max frame size setting.
85
    ///
86
    /// Must be within 16,384 and 16,777,215.
87
    #[inline]
88
15.2k
    pub fn set_max_frame_size(&mut self, val: usize) {
89
15.2k
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
15.2k
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
15.2k
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
15.2k
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_frame_size
Line
Count
Source
88
827
    pub fn set_max_frame_size(&mut self, val: usize) {
89
827
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
827
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
827
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
827
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>>::set_max_frame_size
Line
Count
Source
88
827
    pub fn set_max_frame_size(&mut self, val: usize) {
89
827
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
827
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
827
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
827
    }
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_frame_size
Line
Count
Source
88
13.6k
    pub fn set_max_frame_size(&mut self, val: usize) {
89
13.6k
        assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
90
13.6k
        self.inner.decoder_mut().set_max_frame_length(val);
91
        // Update max CONTINUATION frames too, since its based on this
92
13.6k
        self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val);
93
13.6k
    }
94
95
    /// Update the max header list size setting.
96
    #[inline]
97
0
    pub fn set_max_header_list_size(&mut self, val: usize) {
98
0
        self.max_header_list_size = val;
99
        // Update max CONTINUATION frames too, since its based on this
100
0
        self.max_continuation_frames = calc_max_continuation_frames(val, self.max_frame_size());
101
0
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_header_list_size
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>>::set_max_header_list_size
102
103
    /// Update the header table size setting.
104
    #[inline]
105
0
    pub fn set_header_table_size(&mut self, val: usize) {
106
0
        self.hpack.queue_size_update(val);
107
0
    }
108
}
109
110
30.5k
fn calc_max_continuation_frames(header_max: usize, frame_max: usize) -> usize {
111
    // At least this many frames needed to use max header list size
112
30.5k
    let min_frames_for_list = (header_max / frame_max).max(1);
113
    // Some padding for imperfectly packed frames
114
    // 25% without floats
115
30.5k
    let padding = min_frames_for_list >> 2;
116
30.5k
    min_frames_for_list.saturating_add(padding).max(5)
117
30.5k
}
118
119
/// Decodes a frame.
120
///
121
/// This method is intentionally de-generified and outlined because it is very large.
122
161k
fn decode_frame(
123
161k
    hpack: &mut hpack::Decoder,
124
161k
    max_header_list_size: usize,
125
161k
    max_continuation_frames: usize,
126
161k
    partial_inout: &mut Option<Partial>,
127
161k
    mut bytes: BytesMut,
128
161k
) -> Result<Option<Frame>, Error> {
129
161k
    let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
130
161k
    let _e = span.enter();
131
132
161k
    tracing::trace!("decoding frame from {}B", bytes.len());
133
134
    // Parse the head
135
161k
    let head = frame::Head::parse(&bytes);
136
137
161k
    if partial_inout.is_some() && head.kind() != Kind::Continuation {
138
13
        proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind());
139
13
        return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140
161k
    }
141
142
161k
    let kind = head.kind();
143
144
161k
    tracing::trace!(frame.kind = ?kind);
145
146
    macro_rules! header_block {
147
        ($frame:ident, $head:ident, $bytes:ident) => ({
148
            // Drop the frame header
149
            $bytes.advance(frame::HEADER_LEN);
150
151
            // Parse the header frame w/o parsing the payload
152
            let (mut frame, mut payload) = match frame::$frame::load($head, $bytes) {
153
                Ok(res) => res,
154
                Err(frame::Error::InvalidDependencyId) => {
155
                    proto_err!(stream: "invalid HEADERS dependency ID");
156
                    // A stream cannot depend on itself. An endpoint MUST
157
                    // treat this as a stream error (Section 5.4.2) of type
158
                    // `PROTOCOL_ERROR`.
159
                    return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR));
160
                },
161
                Err(e) => {
162
                    proto_err!(conn: "failed to load frame; err={:?}", e);
163
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
164
                }
165
            };
166
167
            let is_end_headers = frame.is_end_headers();
168
169
            // Load the HPACK encoded headers
170
            match frame.load_hpack(&mut payload, max_header_list_size, hpack) {
171
                Ok(_) => {},
172
                Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {},
173
                Err(frame::Error::MalformedMessage) => {
174
                    let id = $head.stream_id();
175
                    proto_err!(stream: "malformed header block; stream={:?}", id);
176
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
177
                },
178
                Err(e) => {
179
                    proto_err!(conn: "failed HPACK decoding; err={:?}", e);
180
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
181
                }
182
            }
183
184
            if is_end_headers {
185
                frame.into()
186
            } else {
187
                tracing::trace!("loaded partial header block");
188
                // Defer returning the frame
189
                *partial_inout = Some(Partial {
190
                    frame: Continuable::$frame(frame),
191
                    buf: payload,
192
                    continuation_frames_count: 0,
193
                });
194
195
                return Ok(None);
196
            }
197
        });
198
    }
199
200
161k
    let frame = match kind {
201
        Kind::Settings => {
202
7.42k
            let res = frame::Settings::load(head, &bytes[frame::HEADER_LEN..]);
203
204
7.42k
            res.map_err(|e| {
205
161
                proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
206
161
                Error::library_go_away(Reason::PROTOCOL_ERROR)
207
161
            })?
208
7.26k
            .into()
209
        }
210
        Kind::Ping => {
211
649
            let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]);
212
213
649
            res.map_err(|e| {
214
51
                proto_err!(conn: "failed to load PING frame; err={:?}", e);
215
51
                Error::library_go_away(Reason::PROTOCOL_ERROR)
216
51
            })?
217
598
            .into()
218
        }
219
        Kind::WindowUpdate => {
220
2.12k
            let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]);
221
222
2.12k
            res.map_err(|e| {
223
10
                proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
224
10
                Error::library_go_away(Reason::PROTOCOL_ERROR)
225
10
            })?
226
2.11k
            .into()
227
        }
228
        Kind::Data => {
229
67.0k
            bytes.advance(frame::HEADER_LEN);
230
67.0k
            let res = frame::Data::load(head, bytes.freeze());
231
232
            // TODO: Should this always be connection level? Probably not...
233
67.0k
            res.map_err(|e| {
234
90
                proto_err!(conn: "failed to load DATA frame; err={:?}", e);
235
90
                Error::library_go_away(Reason::PROTOCOL_ERROR)
236
90
            })?
237
66.9k
            .into()
238
        }
239
49.3k
        Kind::Headers => header_block!(Headers, head, bytes),
240
        Kind::Reset => {
241
1.60k
            let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
242
1.60k
            res.map_err(|e| {
243
15
                proto_err!(conn: "failed to load RESET frame; err={:?}", e);
244
15
                Error::library_go_away(Reason::PROTOCOL_ERROR)
245
15
            })?
246
1.58k
            .into()
247
        }
248
        Kind::GoAway => {
249
5.51k
            let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
250
5.51k
            res.map_err(|e| {
251
7
                proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
252
7
                Error::library_go_away(Reason::PROTOCOL_ERROR)
253
7
            })?
254
5.50k
            .into()
255
        }
256
2.64k
        Kind::PushPromise => header_block!(PushPromise, head, bytes),
257
        Kind::Priority => {
258
361
            if head.stream_id() == 0 {
259
                // Invalid stream identifier
260
2
                proto_err!(conn: "invalid stream ID 0");
261
2
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
262
359
            }
263
264
359
            match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
265
123
                Ok(frame) => frame.into(),
266
                Err(frame::Error::InvalidDependencyId) => {
267
                    // A stream cannot depend on itself. An endpoint MUST
268
                    // treat this as a stream error (Section 5.4.2) of type
269
                    // `PROTOCOL_ERROR`.
270
195
                    let id = head.stream_id();
271
195
                    proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
272
195
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
273
                }
274
41
                Err(e) => {
275
41
                    proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
276
41
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
277
                }
278
            }
279
        }
280
        Kind::Continuation => {
281
13.0k
            let is_end_headers = (head.flag() & 0x4) == 0x4;
282
283
13.0k
            let mut partial = match partial_inout.take() {
284
13.0k
                Some(partial) => partial,
285
                None => {
286
1
                    proto_err!(conn: "received unexpected CONTINUATION frame");
287
1
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
288
                }
289
            };
290
291
            // The stream identifiers must match
292
13.0k
            if partial.frame.stream_id() != head.stream_id() {
293
52
                proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
294
52
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
295
12.9k
            }
296
297
            // Check for CONTINUATION flood
298
12.9k
            if is_end_headers {
299
223
                partial.continuation_frames_count = 0;
300
223
            } else {
301
12.7k
                let cnt = partial.continuation_frames_count + 1;
302
12.7k
                if cnt > max_continuation_frames {
303
0
                    tracing::debug!("too_many_continuations, max = {}", max_continuation_frames);
304
0
                    return Err(Error::library_go_away_data(
305
0
                        Reason::ENHANCE_YOUR_CALM,
306
0
                        "too_many_continuations",
307
0
                    ));
308
12.7k
                } else {
309
12.7k
                    partial.continuation_frames_count = cnt;
310
12.7k
                }
311
            }
312
313
            // Extend the buf
314
12.9k
            if partial.buf.is_empty() {
315
2.15k
                partial.buf = bytes.split_off(frame::HEADER_LEN);
316
2.15k
            } else {
317
10.7k
                if partial.frame.is_over_size() {
318
                    // If there was left over bytes previously, they may be
319
                    // needed to continue decoding, even though we will
320
                    // be ignoring this frame. This is done to keep the HPACK
321
                    // decoder state up-to-date.
322
                    //
323
                    // Still, we need to be careful, because if a malicious
324
                    // attacker were to try to send a gigantic string, such
325
                    // that it fits over multiple header blocks, we could
326
                    // grow memory uncontrollably again, and that'd be a shame.
327
                    //
328
                    // Instead, we use a simple heuristic to determine if
329
                    // we should continue to ignore decoding, or to tell
330
                    // the attacker to go away.
331
0
                    if partial.buf.len() + bytes.len() > max_header_list_size {
332
0
                        proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
333
0
                        return Err(Error::library_go_away(Reason::COMPRESSION_ERROR));
334
0
                    }
335
10.7k
                }
336
10.7k
                partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
337
            }
338
339
12.9k
            match partial
340
12.9k
                .frame
341
12.9k
                .load_hpack(&mut partial.buf, max_header_list_size, hpack)
342
            {
343
1.32k
                Ok(_) => {}
344
11.3k
                Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {}
345
                Err(frame::Error::MalformedMessage) => {
346
237
                    let id = head.stream_id();
347
237
                    proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
348
237
                    return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
349
                }
350
67
                Err(e) => {
351
67
                    proto_err!(conn: "failed HPACK decoding; err={:?}", e);
352
67
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
353
                }
354
            }
355
356
12.6k
            if is_end_headers {
357
150
                partial.frame.into()
358
            } else {
359
12.4k
                *partial_inout = Some(partial);
360
12.4k
                return Ok(None);
361
            }
362
        }
363
        Kind::Unknown => {
364
            // Unknown frames are ignored
365
11.3k
            return Ok(None);
366
        }
367
    };
368
369
95.3k
    Ok(Some(frame))
370
161k
}
371
372
impl<T> Stream for FramedRead<T>
373
where
374
    T: AsyncRead + Unpin,
375
{
376
    type Item = Result<Frame, Error>;
377
378
1.60M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379
1.60M
        let span = tracing::trace_span!("FramedRead::poll_next");
380
1.60M
        let _e = span.enter();
381
        loop {
382
1.62M
            tracing::trace!("poll");
383
1.62M
            let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
384
161k
                Some(Ok(bytes)) => bytes,
385
1.65k
                Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
386
5.03k
                None => return Poll::Ready(None),
387
            };
388
389
161k
            tracing::trace!(read.bytes = bytes.len());
390
            let Self {
391
161k
                ref mut hpack,
392
161k
                max_header_list_size,
393
161k
                ref mut partial,
394
161k
                max_continuation_frames,
395
                ..
396
161k
            } = *self;
397
161k
            if let Some(frame) = decode_frame(
398
161k
                hpack,
399
161k
                max_header_list_size,
400
161k
                max_continuation_frames,
401
161k
                partial,
402
161k
                bytes,
403
39.0k
            )? {
404
95.3k
                tracing::debug!(?frame, "received");
405
95.3k
                return Poll::Ready(Some(Ok(frame)));
406
26.6k
            }
407
        }
408
1.60M
    }
Unexecuted instantiation: <h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>> as futures_core::stream::Stream>::poll_next
<h2::codec::framed_read::FramedRead<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>> as futures_core::stream::Stream>::poll_next
Line
Count
Source
378
1.60M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379
1.60M
        let span = tracing::trace_span!("FramedRead::poll_next");
380
1.60M
        let _e = span.enter();
381
        loop {
382
1.62M
            tracing::trace!("poll");
383
1.62M
            let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
384
161k
                Some(Ok(bytes)) => bytes,
385
1.65k
                Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
386
5.03k
                None => return Poll::Ready(None),
387
            };
388
389
161k
            tracing::trace!(read.bytes = bytes.len());
390
            let Self {
391
161k
                ref mut hpack,
392
161k
                max_header_list_size,
393
161k
                ref mut partial,
394
161k
                max_continuation_frames,
395
                ..
396
161k
            } = *self;
397
161k
            if let Some(frame) = decode_frame(
398
161k
                hpack,
399
161k
                max_header_list_size,
400
161k
                max_continuation_frames,
401
161k
                partial,
402
161k
                bytes,
403
39.0k
            )? {
404
95.3k
                tracing::debug!(?frame, "received");
405
95.3k
                return Poll::Ready(Some(Ok(frame)));
406
26.6k
            }
407
        }
408
1.60M
    }
409
}
410
411
1.65k
fn map_err(err: io::Error) -> Error {
412
1.65k
    if let io::ErrorKind::InvalidData = err.kind() {
413
681
        if let Some(custom) = err.get_ref() {
414
681
            if custom.is::<LengthDelimitedCodecError>() {
415
681
                return Error::library_go_away(Reason::FRAME_SIZE_ERROR);
416
0
            }
417
0
        }
418
973
    }
419
973
    err.into()
420
1.65k
}
421
422
// ===== impl Continuable =====
423
424
impl Continuable {
425
13.0k
    fn stream_id(&self) -> frame::StreamId {
426
13.0k
        match *self {
427
13.0k
            Continuable::Headers(ref h) => h.stream_id(),
428
1
            Continuable::PushPromise(ref p) => p.stream_id(),
429
        }
430
13.0k
    }
431
432
10.7k
    fn is_over_size(&self) -> bool {
433
10.7k
        match *self {
434
10.7k
            Continuable::Headers(ref h) => h.is_over_size(),
435
0
            Continuable::PushPromise(ref p) => p.is_over_size(),
436
        }
437
10.7k
    }
438
439
12.9k
    fn load_hpack(
440
12.9k
        &mut self,
441
12.9k
        src: &mut BytesMut,
442
12.9k
        max_header_list_size: usize,
443
12.9k
        decoder: &mut hpack::Decoder,
444
12.9k
    ) -> Result<(), frame::Error> {
445
12.9k
        match *self {
446
12.9k
            Continuable::Headers(ref mut h) => h.load_hpack(src, max_header_list_size, decoder),
447
0
            Continuable::PushPromise(ref mut p) => p.load_hpack(src, max_header_list_size, decoder),
448
        }
449
12.9k
    }
450
}
451
452
impl<T> From<Continuable> for Frame<T> {
453
150
    fn from(cont: Continuable) -> Self {
454
150
        match cont {
455
150
            Continuable::Headers(mut headers) => {
456
150
                headers.set_end_headers();
457
150
                headers.into()
458
            }
459
0
            Continuable::PushPromise(mut push) => {
460
0
                push.set_end_headers();
461
0
                push.into()
462
            }
463
        }
464
150
    }
465
}