Coverage Report

Created: 2026-05-30 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/recv.rs
Line
Count
Source
1
use super::*;
2
use crate::codec::UserError;
3
use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4
use crate::proto;
5
6
use http::{HeaderMap, Request, Response};
7
8
use std::cmp::Ordering;
9
use std::io;
10
use std::task::{Context, Poll, Waker};
11
use std::time::Instant;
12
13
#[derive(Debug)]
14
pub(super) struct Recv {
15
    /// Initial window size of remote initiated streams
16
    init_window_sz: WindowSize,
17
18
    /// Connection level flow control governing received data
19
    flow: FlowControl,
20
21
    /// Amount of connection window capacity currently used by outstanding streams.
22
    in_flight_data: WindowSize,
23
24
    /// The lowest stream ID that is still idle
25
    next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27
    /// The stream ID of the last processed stream
28
    last_processed_id: StreamId,
29
30
    /// Any streams with a higher ID are ignored.
31
    ///
32
    /// This starts as MAX, but is lowered when a GOAWAY is received.
33
    ///
34
    /// > After sending a GOAWAY frame, the sender can discard frames for
35
    /// > streams initiated by the receiver with identifiers higher than
36
    /// > the identified last stream.
37
    max_stream_id: StreamId,
38
39
    /// Streams that have pending window updates
40
    pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42
    /// New streams to be accepted
43
    pending_accept: store::Queue<stream::NextAccept>,
44
45
    /// Locally reset streams that should be reaped when they expire
46
    pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48
    /// How long locally reset streams should ignore received frames
49
    reset_duration: Duration,
50
51
    /// Holds frames that are waiting to be read
52
    buffer: Buffer<Event>,
53
54
    /// Refused StreamId, this represents a frame that must be sent out.
55
    refused: Option<StreamId>,
56
57
    /// If push promises are allowed to be received.
58
    is_push_enabled: bool,
59
60
    /// If extended connect protocol is enabled.
61
    is_extended_connect_protocol_enabled: bool,
62
}
63
64
#[derive(Debug)]
65
pub(super) enum Event {
66
    Headers(peer::PollMessage),
67
    Data(Bytes),
68
    Trailers(HeaderMap),
69
    InformationalHeaders(peer::PollMessage),
70
}
71
72
#[derive(Debug)]
73
pub(super) enum RecvHeaderBlockError<T> {
74
    Oversize(T),
75
    State(Error),
76
}
77
78
#[derive(Debug)]
79
pub(crate) enum Open {
80
    PushPromise,
81
    Headers,
82
}
83
84
impl Recv {
85
14.5k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
86
14.5k
        let next_stream_id = if peer.is_server() { 1 } else { 2 };
87
88
14.5k
        let mut flow = FlowControl::new();
89
90
        // connections always have the default window size, regardless of
91
        // settings
92
14.5k
        flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
93
14.5k
            .expect("invalid initial remote window size");
94
14.5k
        flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
95
96
14.5k
        Recv {
97
14.5k
            init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
98
14.5k
            flow,
99
14.5k
            in_flight_data: 0 as WindowSize,
100
14.5k
            next_stream_id: Ok(next_stream_id.into()),
101
14.5k
            pending_window_updates: store::Queue::new(),
102
14.5k
            last_processed_id: StreamId::ZERO,
103
14.5k
            max_stream_id: StreamId::MAX,
104
14.5k
            pending_accept: store::Queue::new(),
105
14.5k
            pending_reset_expired: store::Queue::new(),
106
14.5k
            reset_duration: config.local_reset_duration,
107
14.5k
            buffer: Buffer::new(),
108
14.5k
            refused: None,
109
14.5k
            is_push_enabled: config.local_push_enabled,
110
14.5k
            is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
111
14.5k
        }
112
14.5k
    }
113
114
    /// Returns the initial receive window size
115
488k
    pub fn init_window_sz(&self) -> WindowSize {
116
488k
        self.init_window_sz
117
488k
    }
118
119
    /// Returns the ID of the last processed stream
120
15.9k
    pub fn last_processed_id(&self) -> StreamId {
121
15.9k
        self.last_processed_id
122
15.9k
    }
123
124
    /// Update state reflecting a new, remotely opened stream
125
    ///
126
    /// Returns the stream state if successful. `None` if refused
127
1.46k
    pub fn open(
128
1.46k
        &mut self,
129
1.46k
        id: StreamId,
130
1.46k
        mode: Open,
131
1.46k
        counts: &mut Counts,
132
1.46k
    ) -> Result<Option<StreamId>, Error> {
133
1.46k
        assert!(self.refused.is_none());
134
135
1.46k
        counts.peer().ensure_can_open(id, mode)?;
136
137
1.10k
        let next_id = self.next_stream_id()?;
138
1.10k
        if id < next_id {
139
2
            proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
140
2
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
141
1.09k
        }
142
143
1.09k
        self.next_stream_id = id.next_id();
144
145
1.09k
        if !counts.can_inc_num_recv_streams() {
146
0
            self.refused = Some(id);
147
0
            return Ok(None);
148
1.09k
        }
149
150
1.09k
        Ok(Some(id))
151
1.46k
    }
152
153
    /// Transition the stream state based on receiving headers
154
    ///
155
    /// The caller ensures that the frame represents headers and not trailers.
156
282
    pub fn recv_headers(
157
282
        &mut self,
158
282
        frame: frame::Headers,
159
282
        stream: &mut store::Ptr,
160
282
        counts: &mut Counts,
161
282
    ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
162
282
        tracing::trace!("opening stream; init_window={}", self.init_window_sz);
163
282
        let is_initial = stream.state.recv_open(&frame)?;
164
165
282
        if is_initial {
166
            // TODO: be smarter about this logic
167
2
            if frame.stream_id() > self.last_processed_id {
168
2
                self.last_processed_id = frame.stream_id();
169
2
            }
170
171
            // Increment the number of concurrent streams
172
2
            counts.inc_num_recv_streams(stream);
173
280
        }
174
175
282
        if !stream.content_length.is_head() {
176
            use super::stream::ContentLength;
177
            use http::header;
178
179
282
            if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
180
2
                let content_length = match frame::parse_u64(content_length.as_bytes()) {
181
1
                    Ok(v) => v,
182
                    Err(_) => {
183
1
                        proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
184
1
                        return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
185
                    }
186
                };
187
188
1
                stream.content_length = ContentLength::Remaining(content_length);
189
                // END_STREAM on headers frame with non-zero content-length is malformed.
190
                // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
191
1
                if frame.is_end_stream()
192
0
                    && content_length > 0
193
0
                    && frame
194
0
                        .pseudo()
195
0
                        .status
196
0
                        .map_or(true, |status| status != 204 && status != 304)
197
                {
198
0
                    proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
199
0
                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
200
1
                }
201
280
            }
202
0
        }
203
204
281
        if frame.is_over_size() {
205
            // A frame is over size if the decoded header block was bigger than
206
            // SETTINGS_MAX_HEADER_LIST_SIZE.
207
            //
208
            // > A server that receives a larger header block than it is willing
209
            // > to handle can send an HTTP 431 (Request Header Fields Too
210
            // > Large) status code [RFC6585]. A client can discard responses
211
            // > that it cannot process.
212
            //
213
            // So, if peer is a server, we'll send a 431. In either case,
214
            // an error is recorded, which will send a REFUSED_STREAM,
215
            // since we don't want any of the data frames either.
216
0
            tracing::debug!(
217
0
                "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
218
0
                 recv_headers: frame is over size; stream={:?}",
219
0
                stream.id
220
            );
221
0
            return if counts.peer().is_server() && is_initial {
222
0
                let mut res = frame::Headers::new(
223
0
                    stream.id,
224
0
                    frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
225
0
                    HeaderMap::new(),
226
                );
227
0
                res.set_end_stream();
228
0
                Err(RecvHeaderBlockError::Oversize(Some(res)))
229
            } else {
230
0
                Err(RecvHeaderBlockError::Oversize(None))
231
            };
232
281
        }
233
234
281
        let stream_id = frame.stream_id();
235
281
        let (pseudo, fields) = frame.into_parts();
236
237
281
        if pseudo.protocol.is_some()
238
0
            && counts.peer().is_server()
239
0
            && !self.is_extended_connect_protocol_enabled
240
        {
241
0
            proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
242
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
243
281
        }
244
245
281
        if pseudo.status.is_some() && counts.peer().is_server() {
246
0
            proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
247
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
248
281
        }
249
250
281
        if !pseudo.is_informational() {
251
262
            let message = counts
252
262
                .peer()
253
262
                .convert_poll_message(pseudo, fields, stream_id)?;
254
255
            // Push the frame onto the stream's recv buffer
256
262
            stream
257
262
                .pending_recv
258
262
                .push_back(&mut self.buffer, Event::Headers(message));
259
262
            stream.notify_recv();
260
261
            // Only servers can receive a headers frame that initiates the stream.
262
            // This is verified in `Streams` before calling this function.
263
262
            if counts.peer().is_server() {
264
0
                // Correctness: never push a stream to `pending_accept` without having the
265
0
                // corresponding headers frame pushed to `stream.pending_recv`.
266
0
                self.pending_accept.push(stream);
267
262
            }
268
        } else {
269
            // This is an informational response (1xx status code)
270
            // Convert to response and store it for polling
271
19
            let message = counts
272
19
                .peer()
273
19
                .convert_poll_message(pseudo, fields, stream_id)?;
274
275
19
            tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276
277
            // Push the informational response onto the stream's recv buffer
278
            // with a special event type so it can be polled separately
279
19
            stream
280
19
                .pending_recv
281
19
                .push_back(&mut self.buffer, Event::InformationalHeaders(message));
282
19
            stream.notify_recv();
283
        }
284
285
281
        Ok(())
286
282
    }
287
288
    /// Called by the server to get the request
289
    ///
290
    /// # Panics
291
    ///
292
    /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
293
    ///
294
0
    pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
295
        use super::peer::PollMessage::*;
296
297
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
298
0
            Some(Event::Headers(Server(request))) => request,
299
0
            _ => unreachable!("server stream queue must start with Headers"),
300
        }
301
0
    }
302
303
    /// Called by the client to get pushed response
304
0
    pub fn poll_pushed(
305
0
        &mut self,
306
0
        cx: &Context,
307
0
        stream: &mut store::Ptr,
308
0
    ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
309
        use super::peer::PollMessage::*;
310
311
0
        let mut ppp = stream.pending_push_promises.take();
312
0
        let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
313
0
            match pushed.pending_recv.pop_front(&mut self.buffer) {
314
0
                Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
315
                // When frames are pushed into the queue, it is verified that
316
                // the first frame is a HEADERS frame.
317
0
                _ => panic!("Headers not set on pushed stream"),
318
            }
319
0
        });
320
0
        stream.pending_push_promises = ppp;
321
0
        if let Some(p) = pushed {
322
0
            Poll::Ready(Some(Ok(p)))
323
        } else {
324
0
            let is_open = stream.state.ensure_recv_open()?;
325
326
0
            if is_open {
327
0
                stream.push_task = Some(cx.waker().clone());
328
0
                Poll::Pending
329
            } else {
330
0
                Poll::Ready(None)
331
            }
332
        }
333
0
    }
334
335
    /// Called by the client to get the response
336
567k
    pub fn poll_response(
337
567k
        &mut self,
338
567k
        cx: &Context,
339
567k
        stream: &mut store::Ptr,
340
567k
    ) -> Poll<Result<Response<()>, proto::Error>> {
341
        use super::peer::PollMessage::*;
342
343
        // Skip over any interim informational headers to find the main response
344
        loop {
345
567k
            match stream.pending_recv.pop_front(&mut self.buffer) {
346
201
                Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347
                Some(Event::InformationalHeaders(_)) => {
348
16
                    tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349
16
                    continue;
350
                }
351
0
                Some(_) => panic!("poll_response called after response returned"),
352
                None => {
353
566k
                    if !stream.state.ensure_recv_open()? {
354
8
                        proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
355
8
                        return Poll::Ready(Err(Error::library_reset(
356
8
                            stream.id,
357
8
                            Reason::PROTOCOL_ERROR,
358
8
                        )));
359
485k
                    }
360
361
485k
                    stream.recv_task = Some(cx.waker().clone());
362
485k
                    return Poll::Pending;
363
                }
364
            }
365
        }
366
567k
    }
367
368
    /// Called by the client to get informational responses (1xx status codes)
369
0
    pub fn poll_informational(
370
0
        &mut self,
371
0
        cx: &Context,
372
0
        stream: &mut store::Ptr,
373
0
    ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374
        use super::peer::PollMessage::*;
375
376
        // Try to pop the front event and check if it's an informational response
377
        // If it's not, we put it back
378
0
        if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379
0
            match event {
380
0
                Event::Headers(Client(response)) => {
381
                    // Final response
382
0
                    stream
383
0
                        .pending_recv
384
0
                        .push_front(&mut self.buffer, Event::Headers(Client(response)));
385
0
                    return Poll::Ready(None);
386
                }
387
0
                Event::InformationalHeaders(Client(response)) => {
388
                    // Found an informational response, return it
389
0
                    return Poll::Ready(Some(Ok(response)));
390
                }
391
0
                other => {
392
0
                    // Not an informational response, put it back at the front
393
0
                    stream.pending_recv.push_front(&mut self.buffer, other);
394
0
                }
395
            }
396
0
        }
397
398
        // No informational response available at the front
399
0
        if stream.state.ensure_recv_open()? {
400
            // Request to get notified once more frames arrive
401
0
            stream.recv_task = Some(cx.waker().clone());
402
0
            Poll::Pending
403
        } else {
404
            // No more frames will be received
405
0
            Poll::Ready(None)
406
        }
407
0
    }
408
409
    /// Transition the stream based on receiving trailers
410
4
    pub fn recv_trailers(
411
4
        &mut self,
412
4
        frame: frame::Headers,
413
4
        stream: &mut store::Ptr,
414
4
    ) -> Result<(), Error> {
415
        // Transition the state
416
4
        stream.state.recv_close()?;
417
418
2
        if stream.ensure_content_length_zero().is_err() {
419
0
            proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
420
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
421
2
        }
422
423
2
        let trailers = frame.into_fields();
424
425
        // Push the frame onto the stream's recv buffer
426
2
        stream
427
2
            .pending_recv
428
2
            .push_back(&mut self.buffer, Event::Trailers(trailers));
429
2
        stream.notify_recv();
430
431
2
        Ok(())
432
4
    }
433
434
    /// Releases capacity of the connection
435
56.3k
    pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
436
56.3k
        tracing::trace!(
437
0
            "release_connection_capacity; size={}, connection in_flight_data={}",
438
            capacity,
439
            self.in_flight_data,
440
        );
441
442
        // Decrement in-flight data
443
56.3k
        self.in_flight_data -= capacity;
444
445
        // Assign capacity to connection
446
        // TODO: proper error handling
447
56.3k
        let _res = self.flow.assign_capacity(capacity);
448
56.3k
        debug_assert!(_res.is_ok());
449
450
56.3k
        if self.flow.unclaimed_capacity().is_some() {
451
1.75k
            if let Some(task) = task.take() {
452
0
                task.wake();
453
1.75k
            }
454
54.5k
        }
455
56.3k
    }
456
457
    /// Releases capacity back to the connection & stream
458
7
    pub fn release_capacity(
459
7
        &mut self,
460
7
        capacity: WindowSize,
461
7
        stream: &mut store::Ptr,
462
7
        task: &mut Option<Waker>,
463
7
    ) -> Result<(), UserError> {
464
7
        tracing::trace!("release_capacity; size={}", capacity);
465
466
7
        if capacity > stream.in_flight_recv_data {
467
0
            return Err(UserError::ReleaseCapacityTooBig);
468
7
        }
469
470
7
        self.release_connection_capacity(capacity, task);
471
472
        // Decrement in-flight data
473
7
        stream.in_flight_recv_data -= capacity;
474
475
        // Assign capacity to stream
476
        // TODO: proper error handling
477
7
        let _res = stream.recv_flow.assign_capacity(capacity);
478
7
        debug_assert!(_res.is_ok());
479
480
7
        if stream.recv_flow.unclaimed_capacity().is_some() {
481
            // Queue the stream for sending the WINDOW_UPDATE frame.
482
0
            self.pending_window_updates.push(stream);
483
484
0
            if let Some(task) = task.take() {
485
0
                task.wake();
486
0
            }
487
7
        }
488
489
7
        Ok(())
490
7
    }
491
492
    /// Release any unclaimed capacity for a closed stream.
493
487k
    pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
494
487k
        debug_assert_eq!(stream.ref_count, 0);
495
496
487k
        if stream.in_flight_recv_data == 0 {
497
487k
            return;
498
7
        }
499
500
7
        tracing::trace!(
501
0
            "auto-release closed stream ({:?}) capacity: {:?}",
502
0
            stream.id,
503
0
            stream.in_flight_recv_data,
504
        );
505
506
7
        self.release_connection_capacity(stream.in_flight_recv_data, task);
507
7
        stream.in_flight_recv_data = 0;
508
509
7
        self.clear_recv_buffer(stream);
510
487k
    }
511
512
    /// Set the "target" connection window size.
513
    ///
514
    /// By default, all new connections start with 64kb of window size. As
515
    /// streams used and release capacity, we will send WINDOW_UPDATEs for the
516
    /// connection to bring it back up to the initial "target".
517
    ///
518
    /// Setting a target means that we will try to tell the peer about
519
    /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
520
    /// for the whole connection.
521
    ///
522
    /// The `task` is an optional parked task for the `Connection` that might
523
    /// be blocked on needing more window capacity.
524
0
    pub fn set_target_connection_window(
525
0
        &mut self,
526
0
        target: WindowSize,
527
0
        task: &mut Option<Waker>,
528
0
    ) -> Result<(), Reason> {
529
0
        tracing::trace!(
530
0
            "set_target_connection_window; target={}; available={}, reserved={}",
531
            target,
532
0
            self.flow.available(),
533
            self.in_flight_data,
534
        );
535
536
        // The current target connection window is our `available` plus any
537
        // in-flight data reserved by streams.
538
        //
539
        // Update the flow controller with the difference between the new
540
        // target and the current target.
541
0
        let current = self
542
0
            .flow
543
0
            .available()
544
0
            .add(self.in_flight_data)?
545
0
            .checked_size();
546
0
        if target > current {
547
0
            self.flow.assign_capacity(target - current)?;
548
        } else {
549
0
            self.flow.claim_capacity(current - target)?;
550
        }
551
552
        // If changing the target capacity means we gained a bunch of capacity,
553
        // enough that we went over the update threshold, then schedule sending
554
        // a connection WINDOW_UPDATE.
555
0
        if self.flow.unclaimed_capacity().is_some() {
556
0
            if let Some(task) = task.take() {
557
0
                task.wake();
558
0
            }
559
0
        }
560
0
        Ok(())
561
0
    }
562
563
14
    pub(crate) fn apply_local_settings(
564
14
        &mut self,
565
14
        settings: &frame::Settings,
566
14
        store: &mut Store,
567
14
    ) -> Result<(), proto::Error> {
568
14
        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
569
0
            self.is_extended_connect_protocol_enabled = val;
570
14
        }
571
572
14
        if let Some(target) = settings.initial_window_size() {
573
0
            let old_sz = self.init_window_sz;
574
0
            self.init_window_sz = target;
575
576
0
            tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
577
578
            // Per RFC 7540 §6.9.2:
579
            //
580
            // In addition to changing the flow-control window for streams that are
581
            // not yet active, a SETTINGS frame can alter the initial flow-control
582
            // window size for streams with active flow-control windows (that is,
583
            // streams in the "open" or "half-closed (remote)" state). When the
584
            // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
585
            // the size of all stream flow-control windows that it maintains by the
586
            // difference between the new value and the old value.
587
            //
588
            // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
589
            // space in a flow-control window to become negative. A sender MUST
590
            // track the negative flow-control window and MUST NOT send new
591
            // flow-controlled frames until it receives WINDOW_UPDATE frames that
592
            // cause the flow-control window to become positive.
593
594
0
            match target.cmp(&old_sz) {
595
                Ordering::Less => {
596
                    // We must decrease the (local) window on every open stream.
597
0
                    let dec = old_sz - target;
598
0
                    tracing::trace!("decrementing all windows; dec={}", dec);
599
600
0
                    store.try_for_each(|mut stream| {
601
0
                        stream
602
0
                            .recv_flow
603
0
                            .dec_recv_window(dec)
604
0
                            .map_err(proto::Error::library_go_away)?;
605
0
                        Ok::<_, proto::Error>(())
606
0
                    })?;
607
                }
608
                Ordering::Greater => {
609
                    // We must increase the (local) window on every open stream.
610
0
                    let inc = target - old_sz;
611
0
                    tracing::trace!("incrementing all windows; inc={}", inc);
612
0
                    store.try_for_each(|mut stream| {
613
                        // XXX: Shouldn't the peer have already noticed our
614
                        // overflow and sent us a GOAWAY?
615
0
                        stream
616
0
                            .recv_flow
617
0
                            .inc_window(inc)
618
0
                            .map_err(proto::Error::library_go_away)?;
619
0
                        stream
620
0
                            .recv_flow
621
0
                            .assign_capacity(inc)
622
0
                            .map_err(proto::Error::library_go_away)?;
623
0
                        Ok::<_, proto::Error>(())
624
0
                    })?;
625
                }
626
0
                Ordering::Equal => (),
627
            }
628
14
        }
629
630
14
        Ok(())
631
14
    }
632
633
0
    pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
634
0
        if !stream.state.is_recv_end_stream() {
635
0
            return false;
636
0
        }
637
638
0
        stream.pending_recv.is_empty()
639
0
    }
640
641
41.2k
    pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
642
        // could include padding
643
41.2k
        let sz = frame.flow_controlled_len();
644
645
        // This should have been enforced at the codec::FramedRead layer, so
646
        // this is just a sanity check.
647
41.2k
        assert!(sz <= MAX_WINDOW_SIZE as usize);
648
649
41.2k
        let sz = sz as WindowSize;
650
651
41.2k
        let is_ignoring_frame = stream.state.is_local_error();
652
653
41.2k
        if !is_ignoring_frame && !stream.state.is_recv_streaming() {
654
            // TODO: There are cases where this can be a stream error of
655
            // STREAM_CLOSED instead...
656
657
            // Receiving a DATA frame when not expecting one is a protocol
658
            // error.
659
22
            proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
660
22
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
661
41.1k
        }
662
663
41.1k
        tracing::trace!(
664
0
            "recv_data; size={}; connection={}; stream={}",
665
            sz,
666
0
            self.flow.window_size(),
667
0
            stream.recv_flow.window_size()
668
        );
669
670
41.1k
        if is_ignoring_frame {
671
41.1k
            tracing::trace!(
672
0
                "recv_data; frame ignored on locally reset {:?} for some time",
673
0
                stream.id,
674
            );
675
41.1k
            return self.ignore_data(sz);
676
7
        }
677
678
        // Ensure that there is enough capacity on the connection before acting
679
        // on the stream.
680
7
        self.consume_connection_window(sz)?;
681
682
7
        if stream.recv_flow.window_size() < sz {
683
            // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
684
            // > A receiver MAY respond with a stream error (Section 5.4.2) or
685
            // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
686
            // > it is unable to accept a frame.
687
            //
688
            // So, for violating the **stream** window, we can send either a
689
            // stream or connection error. We've opted to send a stream
690
            // error.
691
0
            return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
692
7
        }
693
694
        // use payload len, padding doesn't count for content-length
695
7
        if stream.dec_content_length(frame.payload().len()).is_err() {
696
0
            proto_err!(stream:
697
                "recv_data: content-length overflow; stream={:?}; len={:?}",
698
0
                stream.id,
699
0
                frame.payload().len(),
700
            );
701
0
            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
702
7
        }
703
704
7
        if frame.is_end_stream() {
705
7
            if stream.ensure_content_length_zero().is_err() {
706
0
                proto_err!(stream:
707
                    "recv_data: content-length underflow; stream={:?}; len={:?}",
708
0
                    stream.id,
709
0
                    frame.payload().len(),
710
                );
711
0
                return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
712
7
            }
713
714
7
            if stream.state.recv_close().is_err() {
715
0
                proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
716
0
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
717
7
            }
718
0
        }
719
720
        // Received a frame, but no one cared about it. fix issue#648
721
7
        if !stream.is_recv {
722
0
            tracing::trace!(
723
0
                "recv_data; frame ignored on stream release {:?} for some time",
724
0
                stream.id,
725
            );
726
0
            self.release_connection_capacity(sz, &mut None);
727
0
            return Ok(());
728
7
        }
729
730
        // Update stream level flow control
731
7
        stream
732
7
            .recv_flow
733
7
            .send_data(sz)
734
7
            .map_err(proto::Error::library_go_away)?;
735
736
        // Track the data as in-flight
737
7
        stream.in_flight_recv_data += sz;
738
739
        // We auto-release the padded length, since the user cannot.
740
7
        if let Some(padded_len) = frame.padded_len() {
741
7
            tracing::trace!(
742
0
                "recv_data; auto-releasing padded length of {:?} for {:?}",
743
                padded_len,
744
0
                stream.id,
745
            );
746
7
            let _res = self.release_capacity(padded_len.into(), stream, &mut None);
747
            // cannot fail, we JUST added more in_flight data above.
748
7
            debug_assert!(_res.is_ok());
749
0
        }
750
751
7
        let event = Event::Data(frame.into_payload());
752
753
        // Push the frame onto the recv buffer
754
7
        stream.pending_recv.push_back(&mut self.buffer, event);
755
7
        stream.notify_recv();
756
757
7
        Ok(())
758
41.2k
    }
759
760
56.2k
    pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
761
        // Ensure that there is enough capacity on the connection...
762
56.2k
        self.consume_connection_window(sz)?;
763
764
        // Since we are ignoring this frame,
765
        // we aren't returning the frame to the user. That means they
766
        // have no way to release the capacity back to the connection. So
767
        // we have to release it automatically.
768
        //
769
        // This call doesn't send a WINDOW_UPDATE immediately, just marks
770
        // the capacity as available to be reclaimed. When the available
771
        // capacity meets a threshold, a WINDOW_UPDATE is then sent.
772
56.2k
        self.release_connection_capacity(sz, &mut None);
773
56.2k
        Ok(())
774
56.2k
    }
775
776
56.2k
    pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
777
56.2k
        if self.flow.window_size() < sz {
778
2
            tracing::debug!(
779
0
                "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
780
0
                self.flow.window_size(),
781
                sz,
782
            );
783
2
            return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
784
56.2k
        }
785
786
        // Update connection level flow control
787
56.2k
        self.flow.send_data(sz).map_err(Error::library_go_away)?;
788
789
        // Track the data as in-flight
790
56.2k
        self.in_flight_data += sz;
791
56.2k
        Ok(())
792
56.2k
    }
793
794
1.09k
    pub fn recv_push_promise(
795
1.09k
        &mut self,
796
1.09k
        frame: frame::PushPromise,
797
1.09k
        stream: &mut store::Ptr,
798
1.09k
    ) -> Result<(), Error> {
799
1.09k
        stream.state.reserve_remote()?;
800
1.09k
        if frame.is_over_size() {
801
            // A frame is over size if the decoded header block was bigger than
802
            // SETTINGS_MAX_HEADER_LIST_SIZE.
803
            //
804
            // > A server that receives a larger header block than it is willing
805
            // > to handle can send an HTTP 431 (Request Header Fields Too
806
            // > Large) status code [RFC6585]. A client can discard responses
807
            // > that it cannot process.
808
            //
809
            // So, if peer is a server, we'll send a 431. In either case,
810
            // an error is recorded, which will send a PROTOCOL_ERROR,
811
            // since we don't want any of the data frames either.
812
0
            tracing::debug!(
813
0
                "stream error PROTOCOL_ERROR -- recv_push_promise: \
814
0
                 headers frame is over size; promised_id={:?};",
815
0
                frame.promised_id(),
816
            );
817
0
            return Err(Error::library_reset(
818
0
                frame.promised_id(),
819
0
                Reason::PROTOCOL_ERROR,
820
0
            ));
821
1.09k
        }
822
823
1.09k
        let promised_id = frame.promised_id();
824
1.09k
        let (pseudo, fields) = frame.into_parts();
825
1.09k
        let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
826
827
265
        if let Err(e) = frame::PushPromise::validate_request(&req) {
828
            use PushPromiseHeaderError::*;
829
96
            match e {
830
72
                NotSafeAndCacheable => proto_err!(
831
                    stream:
832
                    "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
833
0
                    req.method(),
834
                    promised_id,
835
                ),
836
24
                InvalidContentLength(e) => proto_err!(
837
                    stream:
838
                    "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
839
                    e,
840
                    promised_id,
841
                ),
842
            }
843
96
            return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
844
169
        }
845
846
        use super::peer::PollMessage::*;
847
169
        stream
848
169
            .pending_recv
849
169
            .push_back(&mut self.buffer, Event::Headers(Server(req)));
850
169
        stream.notify_recv();
851
169
        stream.notify_push();
852
169
        Ok(())
853
1.09k
    }
854
855
    /// Ensures that `id` is not in the `Idle` state.
856
543
    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
857
543
        if let Ok(next) = self.next_stream_id {
858
269
            if id >= next {
859
54
                tracing::debug!(
860
0
                    "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
861
                    id
862
                );
863
54
                return Err(Reason::PROTOCOL_ERROR);
864
215
            }
865
274
        }
866
        // if next_stream_id is overflowed, that's ok.
867
868
489
        Ok(())
869
543
    }
870
871
    /// Handle remote sending an explicit RST_STREAM.
872
66
    pub fn recv_reset(
873
66
        &mut self,
874
66
        frame: frame::Reset,
875
66
        stream: &mut Stream,
876
66
        counts: &mut Counts,
877
66
    ) -> Result<(), Error> {
878
        // Reseting a stream that the user hasn't accepted is possible,
879
        // but should be done with care. These streams will continue
880
        // to take up memory in the accept queue, but will no longer be
881
        // counted as "concurrent" streams.
882
        //
883
        // So, we have a separate limit for these.
884
        //
885
        // See https://github.com/hyperium/hyper/issues/2877
886
66
        if stream.is_pending_accept {
887
0
            if counts.can_inc_num_remote_reset_streams() {
888
0
                counts.inc_num_remote_reset_streams();
889
0
            } else {
890
0
                tracing::warn!(
891
0
                    "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
892
0
                    counts.max_remote_reset_streams(),
893
                );
894
0
                return Err(Error::library_go_away_data(
895
0
                    Reason::ENHANCE_YOUR_CALM,
896
0
                    "too_many_resets",
897
0
                ));
898
            }
899
66
        }
900
901
        // Notify the stream
902
66
        stream.state.recv_reset(frame, stream.is_pending_send);
903
904
66
        stream.notify_send();
905
66
        stream.notify_recv();
906
66
        stream.notify_push();
907
908
66
        Ok(())
909
66
    }
910
911
    /// Handle a connection-level error
912
413k
    pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
913
        // Receive an error
914
413k
        stream.state.handle_error(err);
915
916
        // If a receiver is waiting, notify it
917
413k
        stream.notify_send();
918
413k
        stream.notify_recv();
919
413k
        stream.notify_push();
920
413k
    }
921
922
0
    pub fn go_away(&mut self, last_processed_id: StreamId) {
923
0
        assert!(self.max_stream_id >= last_processed_id);
924
0
        self.max_stream_id = last_processed_id;
925
0
    }
926
927
223k
    pub fn recv_eof(&mut self, stream: &mut Stream) {
928
223k
        stream.state.recv_eof();
929
223k
        stream.notify_send();
930
223k
        stream.notify_recv();
931
223k
        stream.notify_push();
932
223k
    }
933
934
208
    pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
935
222
        while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
936
14
            // drop it
937
14
        }
938
208
    }
939
940
    /// Get the max ID of streams we can receive.
941
    ///
942
    /// This gets lowered if we send a GOAWAY frame.
943
22.3k
    pub fn max_stream_id(&self) -> StreamId {
944
22.3k
        self.max_stream_id
945
22.3k
    }
946
947
1.10k
    pub fn next_stream_id(&self) -> Result<StreamId, Error> {
948
1.10k
        if let Ok(id) = self.next_stream_id {
949
1.10k
            Ok(id)
950
        } else {
951
1
            Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
952
        }
953
1.10k
    }
954
955
10.3k
    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
956
10.3k
        if let Ok(next_id) = self.next_stream_id {
957
            // Peer::is_local_init should have been called beforehand
958
9.06k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
959
9.06k
            id < next_id
960
        } else {
961
1.33k
            true
962
        }
963
10.3k
    }
964
965
14.7k
    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
966
14.7k
        if let Ok(next_id) = self.next_stream_id {
967
            // !Peer::is_local_init should have been called beforehand
968
13.4k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
969
13.4k
            if id >= next_id {
970
2.52k
                self.next_stream_id = id.next_id();
971
10.8k
            }
972
1.34k
        }
973
14.7k
    }
974
975
    /// Returns true if the remote peer can reserve a stream with the given ID.
976
1.10k
    pub fn ensure_can_reserve(&self) -> Result<(), Error> {
977
1.10k
        if !self.is_push_enabled {
978
0
            proto_err!(conn: "recv_push_promise: push is disabled");
979
0
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
980
1.10k
        }
981
982
1.10k
        Ok(())
983
1.10k
    }
984
985
    /// Add a locally reset stream to queue to be eventually reaped.
986
125k
    pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
987
125k
        if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
988
23.9k
            return;
989
101k
        }
990
991
101k
        if counts.can_inc_num_reset_streams() {
992
49.5k
            counts.inc_num_reset_streams();
993
49.5k
            tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
994
49.5k
            self.pending_reset_expired.push(stream);
995
        } else {
996
52.0k
            tracing::trace!(
997
0
                "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
998
0
                stream.id
999
            );
1000
        }
1001
125k
    }
1002
1003
    /// Send any pending refusals.
1004
570k
    pub fn send_pending_refusal<T, B>(
1005
570k
        &mut self,
1006
570k
        cx: &mut Context,
1007
570k
        dst: &mut Codec<T, Prioritized<B>>,
1008
570k
    ) -> Poll<io::Result<()>>
1009
570k
    where
1010
570k
        T: AsyncWrite + Unpin,
1011
570k
        B: Buf,
1012
    {
1013
570k
        if let Some(stream_id) = self.refused {
1014
0
            ready!(dst.poll_ready(cx))?;
1015
1016
            // Create the RST_STREAM frame
1017
0
            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
1018
1019
            // Buffer the frame
1020
0
            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
1021
570k
        }
1022
1023
570k
        self.refused = None;
1024
1025
570k
        Poll::Ready(Ok(()))
1026
570k
    }
1027
1028
523k
    pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1029
523k
        if !self.pending_reset_expired.is_empty() {
1030
232k
            let now = Instant::now();
1031
232k
            let reset_duration = self.reset_duration;
1032
232k
            while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
1033
232k
                let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
1034
                // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
1035
                // subtraction can panic (because, on some platforms, `Instant` isn't actually
1036
                // monotonic). We use a saturating operation to avoid this panic here.
1037
232k
                now.saturating_duration_since(reset_at) > reset_duration
1038
232k
            }) {
1039
617
                counts.transition_after(stream, true);
1040
617
            }
1041
291k
        }
1042
523k
    }
1043
1044
19.2k
    pub fn clear_queues(
1045
19.2k
        &mut self,
1046
19.2k
        clear_pending_accept: bool,
1047
19.2k
        store: &mut Store,
1048
19.2k
        counts: &mut Counts,
1049
19.2k
    ) {
1050
19.2k
        self.clear_stream_window_update_queue(store, counts);
1051
19.2k
        self.clear_all_reset_streams(store, counts);
1052
1053
19.2k
        if clear_pending_accept {
1054
14.5k
            self.clear_all_pending_accept(store, counts);
1055
14.5k
        }
1056
19.2k
    }
1057
1058
19.2k
    fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
1059
19.2k
        while let Some(stream) = self.pending_window_updates.pop(store) {
1060
0
            counts.transition(stream, |_, stream| {
1061
0
                tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
1062
0
            })
1063
        }
1064
19.2k
    }
1065
1066
    /// Called on EOF
1067
19.2k
    fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1068
68.1k
        while let Some(stream) = self.pending_reset_expired.pop(store) {
1069
48.9k
            counts.transition_after(stream, true);
1070
48.9k
        }
1071
19.2k
    }
1072
1073
14.5k
    fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
1074
14.5k
        while let Some(stream) = self.pending_accept.pop(store) {
1075
0
            counts.transition_after(stream, false);
1076
0
        }
1077
14.5k
    }
1078
1079
452k
    pub fn poll_complete<T, B>(
1080
452k
        &mut self,
1081
452k
        cx: &mut Context,
1082
452k
        store: &mut Store,
1083
452k
        counts: &mut Counts,
1084
452k
        dst: &mut Codec<T, Prioritized<B>>,
1085
452k
    ) -> Poll<io::Result<()>>
1086
452k
    where
1087
452k
        T: AsyncWrite + Unpin,
1088
452k
        B: Buf,
1089
    {
1090
        // Send any pending connection level window updates
1091
452k
        ready!(self.send_connection_window_update(cx, dst))?;
1092
1093
        // Send any pending stream level window updates
1094
451k
        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1095
1096
151k
        Poll::Ready(Ok(()))
1097
452k
    }
1098
1099
    /// Send connection level window update
1100
452k
    fn send_connection_window_update<T, B>(
1101
452k
        &mut self,
1102
452k
        cx: &mut Context,
1103
452k
        dst: &mut Codec<T, Prioritized<B>>,
1104
452k
    ) -> Poll<io::Result<()>>
1105
452k
    where
1106
452k
        T: AsyncWrite + Unpin,
1107
452k
        B: Buf,
1108
    {
1109
452k
        if let Some(incr) = self.flow.unclaimed_capacity() {
1110
1.34k
            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1111
1112
            // Ensure the codec has capacity
1113
1.34k
            ready!(dst.poll_ready(cx))?;
1114
1115
            // Buffer the WINDOW_UPDATE frame
1116
17
            dst.buffer(frame.into())
1117
17
                .expect("invalid WINDOW_UPDATE frame");
1118
1119
            // Update flow control
1120
17
            self.flow
1121
17
                .inc_window(incr)
1122
17
                .expect("unexpected flow control state");
1123
451k
        }
1124
1125
451k
        Poll::Ready(Ok(()))
1126
452k
    }
1127
1128
    /// Send stream level window update
1129
451k
    pub fn send_stream_window_updates<T, B>(
1130
451k
        &mut self,
1131
451k
        cx: &mut Context,
1132
451k
        store: &mut Store,
1133
451k
        counts: &mut Counts,
1134
451k
        dst: &mut Codec<T, Prioritized<B>>,
1135
451k
    ) -> Poll<io::Result<()>>
1136
451k
    where
1137
451k
        T: AsyncWrite + Unpin,
1138
451k
        B: Buf,
1139
    {
1140
        loop {
1141
            // Ensure the codec has capacity
1142
451k
            ready!(dst.poll_ready(cx))?;
1143
1144
            // Get the next stream
1145
151k
            let stream = match self.pending_window_updates.pop(store) {
1146
0
                Some(stream) => stream,
1147
151k
                None => return Poll::Ready(Ok(())),
1148
            };
1149
1150
0
            counts.transition(stream, |_, stream| {
1151
0
                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1152
0
                debug_assert!(!stream.is_pending_window_update);
1153
1154
0
                if !stream.state.is_recv_streaming() {
1155
                    // No need to send window updates on the stream if the stream is
1156
                    // no longer receiving data.
1157
                    //
1158
                    // TODO: is this correct? We could possibly send a window
1159
                    // update on a ReservedRemote stream if we already know
1160
                    // we want to stream the data faster...
1161
0
                    return;
1162
0
                }
1163
1164
                // TODO: de-dup
1165
0
                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1166
0
                    // Create the WINDOW_UPDATE frame
1167
0
                    let frame = frame::WindowUpdate::new(stream.id, incr);
1168
0
1169
0
                    // Buffer it
1170
0
                    dst.buffer(frame.into())
1171
0
                        .expect("invalid WINDOW_UPDATE frame");
1172
0
1173
0
                    // Update flow control
1174
0
                    stream
1175
0
                        .recv_flow
1176
0
                        .inc_window(incr)
1177
0
                        .expect("unexpected flow control state");
1178
0
                }
1179
0
            })
1180
        }
1181
451k
    }
1182
1183
0
    pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1184
0
        self.pending_accept.pop(store).map(|ptr| ptr.key())
1185
0
    }
1186
1187
0
    pub fn poll_data(
1188
0
        &mut self,
1189
0
        cx: &Context,
1190
0
        stream: &mut Stream,
1191
0
    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1192
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
1193
0
            Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1194
0
            Some(event) => {
1195
                // Frame is trailer
1196
0
                stream.pending_recv.push_front(&mut self.buffer, event);
1197
1198
                // Notify the recv task. This is done just in case
1199
                // `poll_trailers` was called.
1200
                //
1201
                // It is very likely that `notify_recv` will just be a no-op (as
1202
                // the task will be None), so this isn't really much of a
1203
                // performance concern. It also means we don't have to track
1204
                // state to see if `poll_trailers` was called before `poll_data`
1205
                // returned `None`.
1206
0
                stream.notify_recv();
1207
1208
                // No more data frames
1209
0
                Poll::Ready(None)
1210
            }
1211
0
            None => self.schedule_recv(cx, stream),
1212
        }
1213
0
    }
1214
1215
0
    pub fn poll_trailers(
1216
0
        &mut self,
1217
0
        cx: &Context,
1218
0
        stream: &mut Stream,
1219
0
    ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1220
0
        match stream.pending_recv.pop_front(&mut self.buffer) {
1221
0
            Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1222
0
            Some(event) => {
1223
                // Frame is not trailers.. not ready to poll trailers yet.
1224
0
                stream.pending_recv.push_front(&mut self.buffer, event);
1225
1226
0
                Poll::Pending
1227
            }
1228
0
            None => self.schedule_recv(cx, stream),
1229
        }
1230
0
    }
1231
1232
0
    fn schedule_recv<T>(
1233
0
        &mut self,
1234
0
        cx: &Context,
1235
0
        stream: &mut Stream,
1236
0
    ) -> Poll<Option<Result<T, proto::Error>>> {
1237
0
        if stream.state.ensure_recv_open()? {
1238
            // Request to get notified once more frames arrive
1239
0
            stream.recv_task = Some(cx.waker().clone());
1240
0
            Poll::Pending
1241
        } else {
1242
            // No more frames will be received
1243
0
            Poll::Ready(None)
1244
        }
1245
0
    }
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::schedule_recv::<bytes::bytes::Bytes>
Unexecuted instantiation: <h2::proto::streams::recv::Recv>::schedule_recv::<http::header::map::HeaderMap>
1246
}
1247
1248
// ===== impl Open =====
1249
1250
impl Open {
1251
1.46k
    pub fn is_push_promise(&self) -> bool {
1252
1.46k
        matches!(*self, Self::PushPromise)
1253
1.46k
    }
1254
}
1255
1256
// ===== impl RecvHeaderBlockError =====
1257
1258
impl<T> From<Error> for RecvHeaderBlockError<T> {
1259
1
    fn from(err: Error) -> Self {
1260
1
        RecvHeaderBlockError::State(err)
1261
1
    }
1262
}