Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/proto/streams/recv.rs
Line
Count
Source (jump to first uncovered line)
1
use super::*;
2
use crate::codec::UserError;
3
use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4
use crate::proto;
5
6
use http::{HeaderMap, Request, Response};
7
8
use std::cmp::Ordering;
9
use std::io;
10
use std::task::{Context, Poll, Waker};
11
use std::time::Instant;
12
13
#[derive(Debug)]
14
pub(super) struct Recv {
15
    /// Initial window size of remote initiated streams
16
    init_window_sz: WindowSize,
17
18
    /// Connection level flow control governing received data
19
    flow: FlowControl,
20
21
    /// Amount of connection window capacity currently used by outstanding streams.
22
    in_flight_data: WindowSize,
23
24
    /// The lowest stream ID that is still idle
25
    next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27
    /// The stream ID of the last processed stream
28
    last_processed_id: StreamId,
29
30
    /// Any streams with a higher ID are ignored.
31
    ///
32
    /// This starts as MAX, but is lowered when a GOAWAY is received.
33
    ///
34
    /// > After sending a GOAWAY frame, the sender can discard frames for
35
    /// > streams initiated by the receiver with identifiers higher than
36
    /// > the identified last stream.
37
    max_stream_id: StreamId,
38
39
    /// Streams that have pending window updates
40
    pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42
    /// New streams to be accepted
43
    pending_accept: store::Queue<stream::NextAccept>,
44
45
    /// Locally reset streams that should be reaped when they expire
46
    pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48
    /// How long locally reset streams should ignore received frames
49
    reset_duration: Duration,
50
51
    /// Holds frames that are waiting to be read
52
    buffer: Buffer<Event>,
53
54
    /// Refused StreamId, this represents a frame that must be sent out.
55
    refused: Option<StreamId>,
56
57
    /// If push promises are allowed to be received.
58
    is_push_enabled: bool,
59
60
    /// If extended connect protocol is enabled.
61
    is_extended_connect_protocol_enabled: bool,
62
}
63
64
#[derive(Debug)]
65
pub(super) enum Event {
66
    Headers(peer::PollMessage),
67
    Data(Bytes),
68
    Trailers(HeaderMap),
69
}
70
71
#[derive(Debug)]
72
pub(super) enum RecvHeaderBlockError<T> {
73
    Oversize(T),
74
    State(Error),
75
}
76
77
#[derive(Debug)]
78
pub(crate) enum Open {
79
    PushPromise,
80
    Headers,
81
}
82
83
impl Recv {
84
12.7k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85
12.7k
        let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87
12.7k
        let mut flow = FlowControl::new();
88
12.7k
89
12.7k
        // connections always have the default window size, regardless of
90
12.7k
        // settings
91
12.7k
        flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92
12.7k
            .expect("invalid initial remote window size");
93
12.7k
        flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
12.7k
95
12.7k
        Recv {
96
12.7k
            init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97
12.7k
            flow,
98
12.7k
            in_flight_data: 0 as WindowSize,
99
12.7k
            next_stream_id: Ok(next_stream_id.into()),
100
12.7k
            pending_window_updates: store::Queue::new(),
101
12.7k
            last_processed_id: StreamId::ZERO,
102
12.7k
            max_stream_id: StreamId::MAX,
103
12.7k
            pending_accept: store::Queue::new(),
104
12.7k
            pending_reset_expired: store::Queue::new(),
105
12.7k
            reset_duration: config.local_reset_duration,
106
12.7k
            buffer: Buffer::new(),
107
12.7k
            refused: None,
108
12.7k
            is_push_enabled: config.local_push_enabled,
109
12.7k
            is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110
12.7k
        }
111
12.7k
    }
112
113
    /// Returns the initial receive window size
114
427k
    pub fn init_window_sz(&self) -> WindowSize {
115
427k
        self.init_window_sz
116
427k
    }
117
118
    /// Returns the ID of the last processed stream
119
12.9k
    pub fn last_processed_id(&self) -> StreamId {
120
12.9k
        self.last_processed_id
121
12.9k
    }
122
123
    /// Update state reflecting a new, remotely opened stream
124
    ///
125
    /// Returns the stream state if successful. `None` if refused
126
1.36k
    pub fn open(
127
1.36k
        &mut self,
128
1.36k
        id: StreamId,
129
1.36k
        mode: Open,
130
1.36k
        counts: &mut Counts,
131
1.36k
    ) -> Result<Option<StreamId>, Error> {
132
1.36k
        assert!(self.refused.is_none());
133
134
1.36k
        counts.peer().ensure_can_open(id, mode)?;
135
136
1.05k
        let next_id = self.next_stream_id()?;
137
1.05k
        if id < next_id {
138
3
            proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139
3
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140
1.04k
        }
141
1.04k
142
1.04k
        self.next_stream_id = id.next_id();
143
1.04k
144
1.04k
        if !counts.can_inc_num_recv_streams() {
145
0
            self.refused = Some(id);
146
0
            return Ok(None);
147
1.04k
        }
148
1.04k
149
1.04k
        Ok(Some(id))
150
1.36k
    }
151
152
    /// Transition the stream state based on receiving headers
153
    ///
154
    /// The caller ensures that the frame represents headers and not trailers.
155
1.56k
    pub fn recv_headers(
156
1.56k
        &mut self,
157
1.56k
        frame: frame::Headers,
158
1.56k
        stream: &mut store::Ptr,
159
1.56k
        counts: &mut Counts,
160
1.56k
    ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161
1.56k
        tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162
1.56k
        let is_initial = stream.state.recv_open(&frame)?;
163
164
1.56k
        if is_initial {
165
            // TODO: be smarter about this logic
166
3
            if frame.stream_id() > self.last_processed_id {
167
3
                self.last_processed_id = frame.stream_id();
168
3
            }
169
170
            // Increment the number of concurrent streams
171
3
            counts.inc_num_recv_streams(stream);
172
1.56k
        }
173
174
1.56k
        if !stream.content_length.is_head() {
175
            use super::stream::ContentLength;
176
            use http::header;
177
178
1.56k
            if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179
184
                let content_length = match frame::parse_u64(content_length.as_bytes()) {
180
160
                    Ok(v) => v,
181
                    Err(_) => {
182
24
                        proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183
24
                        return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184
                    }
185
                };
186
187
160
                stream.content_length = ContentLength::Remaining(content_length);
188
160
                // END_STREAM on headers frame with non-zero content-length is malformed.
189
160
                // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
190
160
                if frame.is_end_stream()
191
29
                    && content_length > 0
192
18
                    && frame
193
18
                        .pseudo()
194
18
                        .status
195
18
                        .map_or(true, |status| status != 204 && status != 304)
196
                {
197
18
                    proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
198
18
                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
199
142
                }
200
1.38k
            }
201
0
        }
202
203
1.52k
        if frame.is_over_size() {
204
            // A frame is over size if the decoded header block was bigger than
205
            // SETTINGS_MAX_HEADER_LIST_SIZE.
206
            //
207
            // > A server that receives a larger header block than it is willing
208
            // > to handle can send an HTTP 431 (Request Header Fields Too
209
            // > Large) status code [RFC6585]. A client can discard responses
210
            // > that it cannot process.
211
            //
212
            // So, if peer is a server, we'll send a 431. In either case,
213
            // an error is recorded, which will send a REFUSED_STREAM,
214
            // since we don't want any of the data frames either.
215
0
            tracing::debug!(
216
0
                "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
217
0
                 recv_headers: frame is over size; stream={:?}",
218
0
                stream.id
219
            );
220
0
            return if counts.peer().is_server() && is_initial {
221
0
                let mut res = frame::Headers::new(
222
0
                    stream.id,
223
0
                    frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
224
0
                    HeaderMap::new(),
225
0
                );
226
0
                res.set_end_stream();
227
0
                Err(RecvHeaderBlockError::Oversize(Some(res)))
228
            } else {
229
0
                Err(RecvHeaderBlockError::Oversize(None))
230
            };
231
1.52k
        }
232
1.52k
233
1.52k
        let stream_id = frame.stream_id();
234
1.52k
        let (pseudo, fields) = frame.into_parts();
235
1.52k
236
1.52k
        if pseudo.protocol.is_some()
237
103
            && counts.peer().is_server()
238
0
            && !self.is_extended_connect_protocol_enabled
239
        {
240
0
            proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
241
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
242
1.52k
        }
243
1.52k
244
1.52k
        if pseudo.status.is_some() && counts.peer().is_server() {
245
0
            proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
246
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
247
1.52k
        }
248
1.52k
249
1.52k
        if !pseudo.is_informational() {
250
1.47k
            let message = counts
251
1.47k
                .peer()
252
1.47k
                .convert_poll_message(pseudo, fields, stream_id)?;
253
254
            // Push the frame onto the stream's recv buffer
255
1.47k
            stream
256
1.47k
                .pending_recv
257
1.47k
                .push_back(&mut self.buffer, Event::Headers(message));
258
1.47k
            stream.notify_recv();
259
1.47k
260
1.47k
            // Only servers can receive a headers frame that initiates the stream.
261
1.47k
            // This is verified in `Streams` before calling this function.
262
1.47k
            if counts.peer().is_server() {
263
0
                // Correctness: never push a stream to `pending_accept` without having the
264
0
                // corresponding headers frame pushed to `stream.pending_recv`.
265
0
                self.pending_accept.push(stream);
266
1.47k
            }
267
52
        }
268
269
1.52k
        Ok(())
270
1.56k
    }
271
272
    /// Called by the server to get the request
273
    ///
274
    /// # Panics
275
    ///
276
    /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
277
    ///
278
0
    pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
279
        use super::peer::PollMessage::*;
280
281
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
282
0
            Some(Event::Headers(Server(request))) => request,
283
0
            _ => unreachable!("server stream queue must start with Headers"),
284
        }
285
0
    }
286
287
    /// Called by the client to get pushed response
288
0
    pub fn poll_pushed(
289
0
        &mut self,
290
0
        cx: &Context,
291
0
        stream: &mut store::Ptr,
292
0
    ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
293
        use super::peer::PollMessage::*;
294
295
0
        let mut ppp = stream.pending_push_promises.take();
296
0
        let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
297
0
            match pushed.pending_recv.pop_front(&mut self.buffer) {
298
0
                Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
299
                // When frames are pushed into the queue, it is verified that
300
                // the first frame is a HEADERS frame.
301
0
                _ => panic!("Headers not set on pushed stream"),
302
            }
303
0
        });
304
0
        stream.pending_push_promises = ppp;
305
0
        if let Some(p) = pushed {
306
0
            Poll::Ready(Some(Ok(p)))
307
        } else {
308
0
            let is_open = stream.state.ensure_recv_open()?;
309
310
0
            if is_open {
311
0
                stream.push_task = Some(cx.waker().clone());
312
0
                Poll::Pending
313
            } else {
314
0
                Poll::Ready(None)
315
            }
316
        }
317
0
    }
318
319
    /// Called by the client to get the response
320
453k
    pub fn poll_response(
321
453k
        &mut self,
322
453k
        cx: &Context,
323
453k
        stream: &mut store::Ptr,
324
453k
    ) -> Poll<Result<Response<()>, proto::Error>> {
325
        use super::peer::PollMessage::*;
326
327
        // If the buffer is not empty, then the first frame must be a HEADERS
328
        // frame or the user violated the contract.
329
453k
        match stream.pending_recv.pop_front(&mut self.buffer) {
330
839
            Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
331
0
            Some(_) => panic!("poll_response called after response returned"),
332
            None => {
333
452k
                if !stream.state.ensure_recv_open()? {
334
6
                    proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
335
6
                    return Poll::Ready(Err(Error::library_reset(
336
6
                        stream.id,
337
6
                        Reason::PROTOCOL_ERROR,
338
6
                    )));
339
424k
                }
340
424k
341
424k
                stream.recv_task = Some(cx.waker().clone());
342
424k
                Poll::Pending
343
            }
344
        }
345
453k
    }
346
347
    /// Transition the stream based on receiving trailers
348
66
    pub fn recv_trailers(
349
66
        &mut self,
350
66
        frame: frame::Headers,
351
66
        stream: &mut store::Ptr,
352
66
    ) -> Result<(), Error> {
353
66
        // Transition the state
354
66
        stream.state.recv_close()?;
355
356
62
        if stream.ensure_content_length_zero().is_err() {
357
19
            proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
358
19
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
359
43
        }
360
43
361
43
        let trailers = frame.into_fields();
362
43
363
43
        // Push the frame onto the stream's recv buffer
364
43
        stream
365
43
            .pending_recv
366
43
            .push_back(&mut self.buffer, Event::Trailers(trailers));
367
43
        stream.notify_recv();
368
43
369
43
        Ok(())
370
66
    }
371
372
    /// Releases capacity of the connection
373
35.0k
    pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
374
35.0k
        tracing::trace!(
375
0
            "release_connection_capacity; size={}, connection in_flight_data={}",
376
            capacity,
377
            self.in_flight_data,
378
        );
379
380
        // Decrement in-flight data
381
35.0k
        self.in_flight_data -= capacity;
382
35.0k
383
35.0k
        // Assign capacity to connection
384
35.0k
        // TODO: proper error handling
385
35.0k
        let _res = self.flow.assign_capacity(capacity);
386
35.0k
        debug_assert!(_res.is_ok());
387
388
35.0k
        if self.flow.unclaimed_capacity().is_some() {
389
352
            if let Some(task) = task.take() {
390
0
                task.wake();
391
352
            }
392
34.7k
        }
393
35.0k
    }
394
395
    /// Releases capacity back to the connection & stream
396
0
    pub fn release_capacity(
397
0
        &mut self,
398
0
        capacity: WindowSize,
399
0
        stream: &mut store::Ptr,
400
0
        task: &mut Option<Waker>,
401
0
    ) -> Result<(), UserError> {
402
0
        tracing::trace!("release_capacity; size={}", capacity);
403
404
0
        if capacity > stream.in_flight_recv_data {
405
0
            return Err(UserError::ReleaseCapacityTooBig);
406
0
        }
407
0
408
0
        self.release_connection_capacity(capacity, task);
409
0
410
0
        // Decrement in-flight data
411
0
        stream.in_flight_recv_data -= capacity;
412
0
413
0
        // Assign capacity to stream
414
0
        // TODO: proper error handling
415
0
        let _res = stream.recv_flow.assign_capacity(capacity);
416
0
        debug_assert!(_res.is_ok());
417
418
0
        if stream.recv_flow.unclaimed_capacity().is_some() {
419
            // Queue the stream for sending the WINDOW_UPDATE frame.
420
0
            self.pending_window_updates.push(stream);
421
422
0
            if let Some(task) = task.take() {
423
0
                task.wake();
424
0
            }
425
0
        }
426
427
0
        Ok(())
428
0
    }
429
430
    /// Release any unclaimed capacity for a closed stream.
431
426k
    pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
432
426k
        debug_assert_eq!(stream.ref_count, 0);
433
434
426k
        if stream.in_flight_recv_data == 0 {
435
426k
            return;
436
64
        }
437
64
438
64
        tracing::trace!(
439
0
            "auto-release closed stream ({:?}) capacity: {:?}",
440
0
            stream.id,
441
0
            stream.in_flight_recv_data,
442
        );
443
444
64
        self.release_connection_capacity(stream.in_flight_recv_data, task);
445
64
        stream.in_flight_recv_data = 0;
446
64
447
64
        self.clear_recv_buffer(stream);
448
426k
    }
449
450
    /// Set the "target" connection window size.
451
    ///
452
    /// By default, all new connections start with 64kb of window size. As
453
    /// streams used and release capacity, we will send WINDOW_UPDATEs for the
454
    /// connection to bring it back up to the initial "target".
455
    ///
456
    /// Setting a target means that we will try to tell the peer about
457
    /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
458
    /// for the whole connection.
459
    ///
460
    /// The `task` is an optional parked task for the `Connection` that might
461
    /// be blocked on needing more window capacity.
462
0
    pub fn set_target_connection_window(
463
0
        &mut self,
464
0
        target: WindowSize,
465
0
        task: &mut Option<Waker>,
466
0
    ) -> Result<(), Reason> {
467
0
        tracing::trace!(
468
0
            "set_target_connection_window; target={}; available={}, reserved={}",
469
0
            target,
470
0
            self.flow.available(),
471
            self.in_flight_data,
472
        );
473
474
        // The current target connection window is our `available` plus any
475
        // in-flight data reserved by streams.
476
        //
477
        // Update the flow controller with the difference between the new
478
        // target and the current target.
479
0
        let current = self
480
0
            .flow
481
0
            .available()
482
0
            .add(self.in_flight_data)?
483
0
            .checked_size();
484
0
        if target > current {
485
0
            self.flow.assign_capacity(target - current)?;
486
        } else {
487
0
            self.flow.claim_capacity(current - target)?;
488
        }
489
490
        // If changing the target capacity means we gained a bunch of capacity,
491
        // enough that we went over the update threshold, then schedule sending
492
        // a connection WINDOW_UPDATE.
493
0
        if self.flow.unclaimed_capacity().is_some() {
494
0
            if let Some(task) = task.take() {
495
0
                task.wake();
496
0
            }
497
0
        }
498
0
        Ok(())
499
0
    }
500
501
6
    pub(crate) fn apply_local_settings(
502
6
        &mut self,
503
6
        settings: &frame::Settings,
504
6
        store: &mut Store,
505
6
    ) -> Result<(), proto::Error> {
506
6
        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
507
0
            self.is_extended_connect_protocol_enabled = val;
508
6
        }
509
510
6
        if let Some(target) = settings.initial_window_size() {
511
0
            let old_sz = self.init_window_sz;
512
0
            self.init_window_sz = target;
513
0
514
0
            tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
515
516
            // Per RFC 7540 §6.9.2:
517
            //
518
            // In addition to changing the flow-control window for streams that are
519
            // not yet active, a SETTINGS frame can alter the initial flow-control
520
            // window size for streams with active flow-control windows (that is,
521
            // streams in the "open" or "half-closed (remote)" state). When the
522
            // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
523
            // the size of all stream flow-control windows that it maintains by the
524
            // difference between the new value and the old value.
525
            //
526
            // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
527
            // space in a flow-control window to become negative. A sender MUST
528
            // track the negative flow-control window and MUST NOT send new
529
            // flow-controlled frames until it receives WINDOW_UPDATE frames that
530
            // cause the flow-control window to become positive.
531
532
0
            match target.cmp(&old_sz) {
533
                Ordering::Less => {
534
                    // We must decrease the (local) window on every open stream.
535
0
                    let dec = old_sz - target;
536
0
                    tracing::trace!("decrementing all windows; dec={}", dec);
537
538
0
                    store.try_for_each(|mut stream| {
539
0
                        stream
540
0
                            .recv_flow
541
0
                            .dec_recv_window(dec)
542
0
                            .map_err(proto::Error::library_go_away)?;
543
0
                        Ok::<_, proto::Error>(())
544
0
                    })?;
545
                }
546
                Ordering::Greater => {
547
                    // We must increase the (local) window on every open stream.
548
0
                    let inc = target - old_sz;
549
0
                    tracing::trace!("incrementing all windows; inc={}", inc);
550
0
                    store.try_for_each(|mut stream| {
551
0
                        // XXX: Shouldn't the peer have already noticed our
552
0
                        // overflow and sent us a GOAWAY?
553
0
                        stream
554
0
                            .recv_flow
555
0
                            .inc_window(inc)
556
0
                            .map_err(proto::Error::library_go_away)?;
557
0
                        stream
558
0
                            .recv_flow
559
0
                            .assign_capacity(inc)
560
0
                            .map_err(proto::Error::library_go_away)?;
561
0
                        Ok::<_, proto::Error>(())
562
0
                    })?;
563
                }
564
0
                Ordering::Equal => (),
565
            }
566
6
        }
567
568
6
        Ok(())
569
6
    }
570
571
0
    pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
572
0
        if !stream.state.is_recv_end_stream() {
573
0
            return false;
574
0
        }
575
0
576
0
        stream.pending_recv.is_empty()
577
0
    }
578
579
24.1k
    pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
580
24.1k
        let sz = frame.payload().len();
581
24.1k
582
24.1k
        // This should have been enforced at the codec::FramedRead layer, so
583
24.1k
        // this is just a sanity check.
584
24.1k
        assert!(sz <= MAX_WINDOW_SIZE as usize);
585
586
24.1k
        let sz = sz as WindowSize;
587
24.1k
588
24.1k
        let is_ignoring_frame = stream.state.is_local_error();
589
24.1k
590
24.1k
        if !is_ignoring_frame && !stream.state.is_recv_streaming() {
591
            // TODO: There are cases where this can be a stream error of
592
            // STREAM_CLOSED instead...
593
594
            // Receiving a DATA frame when not expecting one is a protocol
595
            // error.
596
20
            proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
597
20
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
598
24.1k
        }
599
24.1k
600
24.1k
        tracing::trace!(
601
0
            "recv_data; size={}; connection={}; stream={}",
602
0
            sz,
603
0
            self.flow.window_size(),
604
0
            stream.recv_flow.window_size()
605
        );
606
607
24.1k
        if is_ignoring_frame {
608
21.5k
            tracing::trace!(
609
0
                "recv_data; frame ignored on locally reset {:?} for some time",
610
0
                stream.id,
611
            );
612
21.5k
            return self.ignore_data(sz);
613
2.57k
        }
614
2.57k
615
2.57k
        // Ensure that there is enough capacity on the connection before acting
616
2.57k
        // on the stream.
617
2.57k
        self.consume_connection_window(sz)?;
618
619
2.57k
        if stream.recv_flow.window_size() < sz {
620
            // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
621
            // > A receiver MAY respond with a stream error (Section 5.4.2) or
622
            // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
623
            // > it is unable to accept a frame.
624
            //
625
            // So, for violating the **stream** window, we can send either a
626
            // stream or connection error. We've opted to send a stream
627
            // error.
628
0
            return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
629
2.57k
        }
630
2.57k
631
2.57k
        if stream.dec_content_length(frame.payload().len()).is_err() {
632
2
            proto_err!(stream:
633
                "recv_data: content-length overflow; stream={:?}; len={:?}",
634
0
                stream.id,
635
0
                frame.payload().len(),
636
            );
637
2
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
638
2.57k
        }
639
2.57k
640
2.57k
        if frame.is_end_stream() {
641
47
            if stream.ensure_content_length_zero().is_err() {
642
21
                proto_err!(stream:
643
                    "recv_data: content-length underflow; stream={:?}; len={:?}",
644
0
                    stream.id,
645
0
                    frame.payload().len(),
646
                );
647
21
                return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
648
26
            }
649
26
650
26
            if stream.state.recv_close().is_err() {
651
0
                proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
652
0
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
653
26
            }
654
2.52k
        }
655
656
        // Received a frame, but no one cared about it. fix issue#648
657
2.55k
        if !stream.is_recv {
658
34
            tracing::trace!(
659
0
                "recv_data; frame ignored on stream release {:?} for some time",
660
0
                stream.id,
661
            );
662
34
            self.release_connection_capacity(sz, &mut None);
663
34
            return Ok(());
664
2.51k
        }
665
2.51k
666
2.51k
        // Update stream level flow control
667
2.51k
        stream
668
2.51k
            .recv_flow
669
2.51k
            .send_data(sz)
670
2.51k
            .map_err(proto::Error::library_go_away)?;
671
672
        // Track the data as in-flight
673
2.51k
        stream.in_flight_recv_data += sz;
674
2.51k
675
2.51k
        let event = Event::Data(frame.into_payload());
676
2.51k
677
2.51k
        // Push the frame onto the recv buffer
678
2.51k
        stream.pending_recv.push_back(&mut self.buffer, event);
679
2.51k
        stream.notify_recv();
680
2.51k
681
2.51k
        Ok(())
682
24.1k
    }
683
684
34.9k
    pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
685
34.9k
        // Ensure that there is enough capacity on the connection...
686
34.9k
        self.consume_connection_window(sz)?;
687
688
        // Since we are ignoring this frame,
689
        // we aren't returning the frame to the user. That means they
690
        // have no way to release the capacity back to the connection. So
691
        // we have to release it automatically.
692
        //
693
        // This call doesn't send a WINDOW_UPDATE immediately, just marks
694
        // the capacity as available to be reclaimed. When the available
695
        // capacity meets a threshold, a WINDOW_UPDATE is then sent.
696
34.9k
        self.release_connection_capacity(sz, &mut None);
697
34.9k
        Ok(())
698
34.9k
    }
699
700
37.5k
    pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
701
37.5k
        if self.flow.window_size() < sz {
702
2
            tracing::debug!(
703
0
                "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
704
0
                self.flow.window_size(),
705
                sz,
706
            );
707
2
            return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
708
37.5k
        }
709
37.5k
710
37.5k
        // Update connection level flow control
711
37.5k
        self.flow.send_data(sz).map_err(Error::library_go_away)?;
712
713
        // Track the data as in-flight
714
37.5k
        self.in_flight_data += sz;
715
37.5k
        Ok(())
716
37.5k
    }
717
718
1.04k
    pub fn recv_push_promise(
719
1.04k
        &mut self,
720
1.04k
        frame: frame::PushPromise,
721
1.04k
        stream: &mut store::Ptr,
722
1.04k
    ) -> Result<(), Error> {
723
1.04k
        stream.state.reserve_remote()?;
724
1.04k
        if frame.is_over_size() {
725
            // A frame is over size if the decoded header block was bigger than
726
            // SETTINGS_MAX_HEADER_LIST_SIZE.
727
            //
728
            // > A server that receives a larger header block than it is willing
729
            // > to handle can send an HTTP 431 (Request Header Fields Too
730
            // > Large) status code [RFC6585]. A client can discard responses
731
            // > that it cannot process.
732
            //
733
            // So, if peer is a server, we'll send a 431. In either case,
734
            // an error is recorded, which will send a PROTOCOL_ERROR,
735
            // since we don't want any of the data frames either.
736
0
            tracing::debug!(
737
0
                "stream error PROTOCOL_ERROR -- recv_push_promise: \
738
0
                 headers frame is over size; promised_id={:?};",
739
0
                frame.promised_id(),
740
            );
741
0
            return Err(Error::library_reset(
742
0
                frame.promised_id(),
743
0
                Reason::PROTOCOL_ERROR,
744
0
            ));
745
1.04k
        }
746
1.04k
747
1.04k
        let promised_id = frame.promised_id();
748
1.04k
        let (pseudo, fields) = frame.into_parts();
749
1.04k
        let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
750
751
334
        if let Err(e) = frame::PushPromise::validate_request(&req) {
752
            use PushPromiseHeaderError::*;
753
144
            match e {
754
122
                NotSafeAndCacheable => proto_err!(
755
                    stream:
756
                    "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
757
0
                    req.method(),
758
                    promised_id,
759
                ),
760
22
                InvalidContentLength(e) => proto_err!(
761
22
                    stream:
762
22
                    "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
763
22
                    e,
764
22
                    promised_id,
765
22
                ),
766
            }
767
144
            return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
768
190
        }
769
770
        use super::peer::PollMessage::*;
771
190
        stream
772
190
            .pending_recv
773
190
            .push_back(&mut self.buffer, Event::Headers(Server(req)));
774
190
        stream.notify_recv();
775
190
        stream.notify_push();
776
190
        Ok(())
777
1.04k
    }
778
779
    /// Ensures that `id` is not in the `Idle` state.
780
650
    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
781
650
        if let Ok(next) = self.next_stream_id {
782
454
            if id >= next {
783
40
                tracing::debug!(
784
0
                    "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
785
                    id
786
                );
787
40
                return Err(Reason::PROTOCOL_ERROR);
788
414
            }
789
196
        }
790
        // if next_stream_id is overflowed, that's ok.
791
792
610
        Ok(())
793
650
    }
794
795
    /// Handle remote sending an explicit RST_STREAM.
796
616
    pub fn recv_reset(
797
616
        &mut self,
798
616
        frame: frame::Reset,
799
616
        stream: &mut Stream,
800
616
        counts: &mut Counts,
801
616
    ) -> Result<(), Error> {
802
616
        // Reseting a stream that the user hasn't accepted is possible,
803
616
        // but should be done with care. These streams will continue
804
616
        // to take up memory in the accept queue, but will no longer be
805
616
        // counted as "concurrent" streams.
806
616
        //
807
616
        // So, we have a separate limit for these.
808
616
        //
809
616
        // See https://github.com/hyperium/hyper/issues/2877
810
616
        if stream.is_pending_accept {
811
0
            if counts.can_inc_num_remote_reset_streams() {
812
0
                counts.inc_num_remote_reset_streams();
813
0
            } else {
814
0
                tracing::warn!(
815
0
                    "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
816
0
                    counts.max_remote_reset_streams(),
817
                );
818
0
                return Err(Error::library_go_away_data(
819
0
                    Reason::ENHANCE_YOUR_CALM,
820
0
                    "too_many_resets",
821
0
                ));
822
            }
823
616
        }
824
825
        // Notify the stream
826
616
        stream.state.recv_reset(frame, stream.is_pending_send);
827
616
828
616
        stream.notify_send();
829
616
        stream.notify_recv();
830
616
        stream.notify_push();
831
616
832
616
        Ok(())
833
616
    }
834
835
    /// Handle a connection-level error
836
283k
    pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
837
283k
        // Receive an error
838
283k
        stream.state.handle_error(err);
839
283k
840
283k
        // If a receiver is waiting, notify it
841
283k
        stream.notify_send();
842
283k
        stream.notify_recv();
843
283k
        stream.notify_push();
844
283k
    }
845
846
0
    pub fn go_away(&mut self, last_processed_id: StreamId) {
847
0
        assert!(self.max_stream_id >= last_processed_id);
848
0
        self.max_stream_id = last_processed_id;
849
0
    }
850
851
268k
    pub fn recv_eof(&mut self, stream: &mut Stream) {
852
268k
        stream.state.recv_eof();
853
268k
        stream.notify_send();
854
268k
        stream.notify_recv();
855
268k
        stream.notify_push();
856
268k
    }
857
858
903
    pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
859
3.21k
        while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
860
2.31k
            // drop it
861
2.31k
        }
862
903
    }
863
864
    /// Get the max ID of streams we can receive.
865
    ///
866
    /// This gets lowered if we send a GOAWAY frame.
867
25.7k
    pub fn max_stream_id(&self) -> StreamId {
868
25.7k
        self.max_stream_id
869
25.7k
    }
870
871
1.05k
    pub fn next_stream_id(&self) -> Result<StreamId, Error> {
872
1.05k
        if let Ok(id) = self.next_stream_id {
873
1.05k
            Ok(id)
874
        } else {
875
1
            Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
876
        }
877
1.05k
    }
878
879
7.82k
    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
880
7.82k
        if let Ok(next_id) = self.next_stream_id {
881
            // Peer::is_local_init should have been called beforehand
882
4.38k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
883
4.38k
            id < next_id
884
        } else {
885
3.44k
            true
886
        }
887
7.82k
    }
888
889
11.3k
    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
890
11.3k
        if let Ok(next_id) = self.next_stream_id {
891
            // !Peer::is_local_init should have been called beforehand
892
7.90k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
893
7.90k
            if id >= next_id {
894
2.16k
                self.next_stream_id = id.next_id();
895
5.74k
            }
896
3.44k
        }
897
11.3k
    }
898
899
    /// Returns true if the remote peer can reserve a stream with the given ID.
900
1.05k
    pub fn ensure_can_reserve(&self) -> Result<(), Error> {
901
1.05k
        if !self.is_push_enabled {
902
0
            proto_err!(conn: "recv_push_promise: push is disabled");
903
0
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
904
1.05k
        }
905
1.05k
906
1.05k
        Ok(())
907
1.05k
    }
908
909
    /// Add a locally reset stream to queue to be eventually reaped.
910
127k
    pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
911
127k
        if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
912
20.2k
            return;
913
107k
        }
914
107k
915
107k
        if counts.can_inc_num_reset_streams() {
916
48.8k
            counts.inc_num_reset_streams();
917
48.8k
            tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
918
48.8k
            self.pending_reset_expired.push(stream);
919
        } else {
920
58.6k
            tracing::trace!(
921
0
                "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
922
0
                stream.id
923
            );
924
        }
925
127k
    }
926
927
    /// Send any pending refusals.
928
1.61M
    pub fn send_pending_refusal<T, B>(
929
1.61M
        &mut self,
930
1.61M
        cx: &mut Context,
931
1.61M
        dst: &mut Codec<T, Prioritized<B>>,
932
1.61M
    ) -> Poll<io::Result<()>>
933
1.61M
    where
934
1.61M
        T: AsyncWrite + Unpin,
935
1.61M
        B: Buf,
936
1.61M
    {
937
1.61M
        if let Some(stream_id) = self.refused {
938
0
            ready!(dst.poll_ready(cx))?;
939
940
            // Create the RST_STREAM frame
941
0
            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
942
0
943
0
            // Buffer the frame
944
0
            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
945
1.61M
        }
946
947
1.61M
        self.refused = None;
948
1.61M
949
1.61M
        Poll::Ready(Ok(()))
950
1.61M
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::send_pending_refusal::<_, _>
<h2::proto::streams::recv::Recv>::send_pending_refusal::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
928
1.61M
    pub fn send_pending_refusal<T, B>(
929
1.61M
        &mut self,
930
1.61M
        cx: &mut Context,
931
1.61M
        dst: &mut Codec<T, Prioritized<B>>,
932
1.61M
    ) -> Poll<io::Result<()>>
933
1.61M
    where
934
1.61M
        T: AsyncWrite + Unpin,
935
1.61M
        B: Buf,
936
1.61M
    {
937
1.61M
        if let Some(stream_id) = self.refused {
938
0
            ready!(dst.poll_ready(cx))?;
939
940
            // Create the RST_STREAM frame
941
0
            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
942
0
943
0
            // Buffer the frame
944
0
            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
945
1.61M
        }
946
947
1.61M
        self.refused = None;
948
1.61M
949
1.61M
        Poll::Ready(Ok(()))
950
1.61M
    }
951
952
1.67M
    pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
953
1.67M
        if !self.pending_reset_expired.is_empty() {
954
987k
            let now = Instant::now();
955
987k
            let reset_duration = self.reset_duration;
956
987k
            while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
957
987k
                let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
958
987k
                // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
959
987k
                // subtraction can panic (because, on some platforms, `Instant` isn't actually
960
987k
                // monotonic). We use a saturating operation to avoid this panic here.
961
987k
                now.saturating_duration_since(reset_at) > reset_duration
962
987k
            }) {
963
150
                counts.transition_after(stream, true);
964
150
            }
965
683k
        }
966
1.67M
    }
967
968
17.4k
    pub fn clear_queues(
969
17.4k
        &mut self,
970
17.4k
        clear_pending_accept: bool,
971
17.4k
        store: &mut Store,
972
17.4k
        counts: &mut Counts,
973
17.4k
    ) {
974
17.4k
        self.clear_stream_window_update_queue(store, counts);
975
17.4k
        self.clear_all_reset_streams(store, counts);
976
17.4k
977
17.4k
        if clear_pending_accept {
978
12.7k
            self.clear_all_pending_accept(store, counts);
979
12.7k
        }
980
17.4k
    }
981
982
17.4k
    fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
983
17.4k
        while let Some(stream) = self.pending_window_updates.pop(store) {
984
0
            counts.transition(stream, |_, stream| {
985
0
                tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
986
0
            })
987
        }
988
17.4k
    }
989
990
    /// Called on EOF
991
17.4k
    fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
992
66.2k
        while let Some(stream) = self.pending_reset_expired.pop(store) {
993
48.7k
            counts.transition_after(stream, true);
994
48.7k
        }
995
17.4k
    }
996
997
12.7k
    fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
998
12.7k
        while let Some(stream) = self.pending_accept.pop(store) {
999
0
            counts.transition_after(stream, false);
1000
0
        }
1001
12.7k
    }
1002
1003
1.60M
    pub fn poll_complete<T, B>(
1004
1.60M
        &mut self,
1005
1.60M
        cx: &mut Context,
1006
1.60M
        store: &mut Store,
1007
1.60M
        counts: &mut Counts,
1008
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1009
1.60M
    ) -> Poll<io::Result<()>>
1010
1.60M
    where
1011
1.60M
        T: AsyncWrite + Unpin,
1012
1.60M
        B: Buf,
1013
1.60M
    {
1014
        // Send any pending connection level window updates
1015
1.60M
        ready!(self.send_connection_window_update(cx, dst))?;
1016
1017
        // Send any pending stream level window updates
1018
1.60M
        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1019
1020
381k
        Poll::Ready(Ok(()))
1021
1.60M
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::poll_complete::<_, _>
<h2::proto::streams::recv::Recv>::poll_complete::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
1003
1.60M
    pub fn poll_complete<T, B>(
1004
1.60M
        &mut self,
1005
1.60M
        cx: &mut Context,
1006
1.60M
        store: &mut Store,
1007
1.60M
        counts: &mut Counts,
1008
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1009
1.60M
    ) -> Poll<io::Result<()>>
1010
1.60M
    where
1011
1.60M
        T: AsyncWrite + Unpin,
1012
1.60M
        B: Buf,
1013
1.60M
    {
1014
        // Send any pending connection level window updates
1015
1.60M
        ready!(self.send_connection_window_update(cx, dst))?;
1016
1017
        // Send any pending stream level window updates
1018
1.60M
        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1019
1020
381k
        Poll::Ready(Ok(()))
1021
1.60M
    }
1022
1023
    /// Send connection level window update
1024
1.60M
    fn send_connection_window_update<T, B>(
1025
1.60M
        &mut self,
1026
1.60M
        cx: &mut Context,
1027
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1028
1.60M
    ) -> Poll<io::Result<()>>
1029
1.60M
    where
1030
1.60M
        T: AsyncWrite + Unpin,
1031
1.60M
        B: Buf,
1032
1.60M
    {
1033
1.60M
        if let Some(incr) = self.flow.unclaimed_capacity() {
1034
1.36k
            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1035
1036
            // Ensure the codec has capacity
1037
1.36k
            ready!(dst.poll_ready(cx))?;
1038
1039
            // Buffer the WINDOW_UPDATE frame
1040
39
            dst.buffer(frame.into())
1041
39
                .expect("invalid WINDOW_UPDATE frame");
1042
39
1043
39
            // Update flow control
1044
39
            self.flow
1045
39
                .inc_window(incr)
1046
39
                .expect("unexpected flow control state");
1047
1.60M
        }
1048
1049
1.60M
        Poll::Ready(Ok(()))
1050
1.60M
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::send_connection_window_update::<_, _>
<h2::proto::streams::recv::Recv>::send_connection_window_update::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
1024
1.60M
    fn send_connection_window_update<T, B>(
1025
1.60M
        &mut self,
1026
1.60M
        cx: &mut Context,
1027
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1028
1.60M
    ) -> Poll<io::Result<()>>
1029
1.60M
    where
1030
1.60M
        T: AsyncWrite + Unpin,
1031
1.60M
        B: Buf,
1032
1.60M
    {
1033
1.60M
        if let Some(incr) = self.flow.unclaimed_capacity() {
1034
1.36k
            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1035
1036
            // Ensure the codec has capacity
1037
1.36k
            ready!(dst.poll_ready(cx))?;
1038
1039
            // Buffer the WINDOW_UPDATE frame
1040
39
            dst.buffer(frame.into())
1041
39
                .expect("invalid WINDOW_UPDATE frame");
1042
39
1043
39
            // Update flow control
1044
39
            self.flow
1045
39
                .inc_window(incr)
1046
39
                .expect("unexpected flow control state");
1047
1.60M
        }
1048
1049
1.60M
        Poll::Ready(Ok(()))
1050
1.60M
    }
1051
1052
    /// Send stream level window update
1053
1.60M
    pub fn send_stream_window_updates<T, B>(
1054
1.60M
        &mut self,
1055
1.60M
        cx: &mut Context,
1056
1.60M
        store: &mut Store,
1057
1.60M
        counts: &mut Counts,
1058
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1059
1.60M
    ) -> Poll<io::Result<()>>
1060
1.60M
    where
1061
1.60M
        T: AsyncWrite + Unpin,
1062
1.60M
        B: Buf,
1063
1.60M
    {
1064
        loop {
1065
            // Ensure the codec has capacity
1066
1.60M
            ready!(dst.poll_ready(cx))?;
1067
1068
            // Get the next stream
1069
381k
            let stream = match self.pending_window_updates.pop(store) {
1070
0
                Some(stream) => stream,
1071
381k
                None => return Poll::Ready(Ok(())),
1072
            };
1073
1074
0
            counts.transition(stream, |_, stream| {
1075
0
                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1076
0
                debug_assert!(!stream.is_pending_window_update);
1077
1078
0
                if !stream.state.is_recv_streaming() {
1079
                    // No need to send window updates on the stream if the stream is
1080
                    // no longer receiving data.
1081
                    //
1082
                    // TODO: is this correct? We could possibly send a window
1083
                    // update on a ReservedRemote stream if we already know
1084
                    // we want to stream the data faster...
1085
0
                    return;
1086
0
                }
1087
1088
                // TODO: de-dup
1089
0
                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1090
0
                    // Create the WINDOW_UPDATE frame
1091
0
                    let frame = frame::WindowUpdate::new(stream.id, incr);
1092
0
1093
0
                    // Buffer it
1094
0
                    dst.buffer(frame.into())
1095
0
                        .expect("invalid WINDOW_UPDATE frame");
1096
0
1097
0
                    // Update flow control
1098
0
                    stream
1099
0
                        .recv_flow
1100
0
                        .inc_window(incr)
1101
0
                        .expect("unexpected flow control state");
1102
0
                }
1103
0
            })
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::send_stream_window_updates::<_, _>::{closure#0}
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::send_stream_window_updates::<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}
1104
        }
1105
1.60M
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::send_stream_window_updates::<_, _>
<h2::proto::streams::recv::Recv>::send_stream_window_updates::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
1053
1.60M
    pub fn send_stream_window_updates<T, B>(
1054
1.60M
        &mut self,
1055
1.60M
        cx: &mut Context,
1056
1.60M
        store: &mut Store,
1057
1.60M
        counts: &mut Counts,
1058
1.60M
        dst: &mut Codec<T, Prioritized<B>>,
1059
1.60M
    ) -> Poll<io::Result<()>>
1060
1.60M
    where
1061
1.60M
        T: AsyncWrite + Unpin,
1062
1.60M
        B: Buf,
1063
1.60M
    {
1064
        loop {
1065
            // Ensure the codec has capacity
1066
1.60M
            ready!(dst.poll_ready(cx))?;
1067
1068
            // Get the next stream
1069
381k
            let stream = match self.pending_window_updates.pop(store) {
1070
0
                Some(stream) => stream,
1071
381k
                None => return Poll::Ready(Ok(())),
1072
            };
1073
1074
0
            counts.transition(stream, |_, stream| {
1075
                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1076
                debug_assert!(!stream.is_pending_window_update);
1077
1078
                if !stream.state.is_recv_streaming() {
1079
                    // No need to send window updates on the stream if the stream is
1080
                    // no longer receiving data.
1081
                    //
1082
                    // TODO: is this correct? We could possibly send a window
1083
                    // update on a ReservedRemote stream if we already know
1084
                    // we want to stream the data faster...
1085
                    return;
1086
                }
1087
1088
                // TODO: de-dup
1089
                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1090
                    // Create the WINDOW_UPDATE frame
1091
                    let frame = frame::WindowUpdate::new(stream.id, incr);
1092
1093
                    // Buffer it
1094
                    dst.buffer(frame.into())
1095
                        .expect("invalid WINDOW_UPDATE frame");
1096
1097
                    // Update flow control
1098
                    stream
1099
                        .recv_flow
1100
                        .inc_window(incr)
1101
                        .expect("unexpected flow control state");
1102
                }
1103
0
            })
1104
        }
1105
1.60M
    }
1106
1107
0
    pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1108
0
        self.pending_accept.pop(store).map(|ptr| ptr.key())
1109
0
    }
1110
1111
0
    pub fn poll_data(
1112
0
        &mut self,
1113
0
        cx: &Context,
1114
0
        stream: &mut Stream,
1115
0
    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1116
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
1117
0
            Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1118
0
            Some(event) => {
1119
0
                // Frame is trailer
1120
0
                stream.pending_recv.push_front(&mut self.buffer, event);
1121
0
1122
0
                // Notify the recv task. This is done just in case
1123
0
                // `poll_trailers` was called.
1124
0
                //
1125
0
                // It is very likely that `notify_recv` will just be a no-op (as
1126
0
                // the task will be None), so this isn't really much of a
1127
0
                // performance concern. It also means we don't have to track
1128
0
                // state to see if `poll_trailers` was called before `poll_data`
1129
0
                // returned `None`.
1130
0
                stream.notify_recv();
1131
0
1132
0
                // No more data frames
1133
0
                Poll::Ready(None)
1134
            }
1135
0
            None => self.schedule_recv(cx, stream),
1136
        }
1137
0
    }
1138
1139
0
    pub fn poll_trailers(
1140
0
        &mut self,
1141
0
        cx: &Context,
1142
0
        stream: &mut Stream,
1143
0
    ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1144
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
1145
0
            Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1146
0
            Some(event) => {
1147
0
                // Frame is not trailers.. not ready to poll trailers yet.
1148
0
                stream.pending_recv.push_front(&mut self.buffer, event);
1149
0
1150
0
                Poll::Pending
1151
            }
1152
0
            None => self.schedule_recv(cx, stream),
1153
        }
1154
0
    }
1155
1156
0
    fn schedule_recv<T>(
1157
0
        &mut self,
1158
0
        cx: &Context,
1159
0
        stream: &mut Stream,
1160
0
    ) -> Poll<Option<Result<T, proto::Error>>> {
1161
0
        if stream.state.ensure_recv_open()? {
1162
            // Request to get notified once more frames arrive
1163
0
            stream.recv_task = Some(cx.waker().clone());
1164
0
            Poll::Pending
1165
        } else {
1166
            // No more frames will be received
1167
0
            Poll::Ready(None)
1168
        }
1169
0
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::schedule_recv::<bytes::bytes::Bytes>
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::schedule_recv::<http::header::map::HeaderMap>
1170
}
1171
1172
// ===== impl Open =====
1173
1174
impl Open {
1175
1.36k
    pub fn is_push_promise(&self) -> bool {
1176
1.36k
        matches!(*self, Self::PushPromise)
1177
1.36k
    }
1178
}
1179
1180
// ===== impl RecvHeaderBlockError =====
1181
1182
impl<T> From<Error> for RecvHeaderBlockError<T> {
1183
42
    fn from(err: Error) -> Self {
1184
42
        RecvHeaderBlockError::State(err)
1185
42
    }
1186
}