Coverage Report

Created: 2026-02-14 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/state.rs
Line
Count
Source
1
use std::fmt;
2
use std::io;
3
4
use crate::codec::UserError;
5
use crate::frame::{self, Reason, StreamId};
6
use crate::proto::{self, Error, Initiator, PollReset};
7
8
use self::Inner::*;
9
use self::Peer::*;
10
11
/// Represents the state of an H2 stream
12
///
13
/// ```not_rust
14
///                              +--------+
15
///                      send PP |        | recv PP
16
///                     ,--------|  idle  |--------.
17
///                    /         |        |         \
18
///                   v          +--------+          v
19
///            +----------+          |           +----------+
20
///            |          |          | send H /  |          |
21
///     ,------| reserved |          | recv H    | reserved |------.
22
///     |      | (local)  |          |           | (remote) |      |
23
///     |      +----------+          v           +----------+      |
24
///     |          |             +--------+             |          |
25
///     |          |     recv ES |        | send ES     |          |
26
///     |   send H |     ,-------|  open  |-------.     | recv H   |
27
///     |          |    /        |        |        \    |          |
28
///     |          v   v         +--------+         v   v          |
29
///     |      +----------+          |           +----------+      |
30
///     |      |   half   |          |           |   half   |      |
31
///     |      |  closed  |          | send R /  |  closed  |      |
32
///     |      | (remote) |          | recv R    | (local)  |      |
33
///     |      +----------+          |           +----------+      |
34
///     |           |                |                 |           |
35
///     |           | send ES /      |       recv ES / |           |
36
///     |           | send R /       v        send R / |           |
37
///     |           | recv R     +--------+   recv R   |           |
38
///     | send R /  `----------->|        |<-----------'  send R / |
39
///     | recv R                 | closed |               recv R   |
40
///     `----------------------->|        |<----------------------'
41
///                              +--------+
42
///
43
///        send:   endpoint sends this frame
44
///        recv:   endpoint receives this frame
45
///
46
///        H:  HEADERS frame (with implied CONTINUATIONs)
47
///        PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
48
///        ES: END_STREAM flag
49
///        R:  RST_STREAM frame
50
/// ```
51
#[derive(Clone)]
52
pub struct State {
53
    inner: Inner,
54
}
55
56
#[derive(Debug, Clone)]
57
enum Inner {
58
    Idle,
59
    // TODO: these states shouldn't count against concurrency limits:
60
    ReservedLocal,
61
    ReservedRemote,
62
    Open { local: Peer, remote: Peer },
63
    HalfClosedLocal(Peer), // TODO: explicitly name this value
64
    HalfClosedRemote(Peer),
65
    Closed(Cause),
66
}
67
68
#[derive(Debug, Copy, Clone, Default)]
69
enum Peer {
70
    #[default]
71
    AwaitingHeaders,
72
    Streaming,
73
}
74
75
#[derive(Debug, Clone)]
76
enum Cause {
77
    EndStream,
78
    Error(Error),
79
80
    /// This indicates to the connection that a reset frame must be sent out
81
    /// once the send queue has been flushed.
82
    ///
83
    /// Examples of when this could happen:
84
    /// - User drops all references to a stream, so we want to CANCEL the it.
85
    /// - Header block size was too large, so we want to REFUSE, possibly
86
    ///   after sending a 431 response frame.
87
    ScheduledLibraryReset(Reason),
88
}
89
90
impl State {
91
    /// Opens the send-half of a stream if it is not already open.
92
476k
    pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
93
476k
        let local = Streaming;
94
95
476k
        self.inner = match self.inner {
96
            Idle => {
97
476k
                if eos {
98
739
                    HalfClosedLocal(AwaitingHeaders)
99
                } else {
100
476k
                    Open {
101
476k
                        local,
102
476k
                        remote: AwaitingHeaders,
103
476k
                    }
104
                }
105
            }
106
            Open {
107
                local: AwaitingHeaders,
108
0
                remote,
109
            } => {
110
0
                if eos {
111
0
                    HalfClosedLocal(remote)
112
                } else {
113
0
                    Open { local, remote }
114
                }
115
            }
116
            HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
117
0
                if eos {
118
0
                    Closed(Cause::EndStream)
119
                } else {
120
0
                    HalfClosedRemote(local)
121
                }
122
            }
123
            _ => {
124
                // All other transitions result in a protocol error
125
0
                return Err(UserError::UnexpectedFrameType);
126
            }
127
        };
128
129
476k
        Ok(())
130
476k
    }
131
132
    /// Opens the receive-half of the stream when a HEADERS frame is received.
133
    ///
134
    /// Returns true if this transitions the state to Open.
135
2.87k
    pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
136
2.87k
        let mut initial = false;
137
2.87k
        let eos = frame.is_end_stream();
138
139
2.87k
        self.inner = match self.inner {
140
            Idle => {
141
0
                initial = true;
142
143
0
                if eos {
144
0
                    HalfClosedRemote(AwaitingHeaders)
145
                } else {
146
                    Open {
147
0
                        local: AwaitingHeaders,
148
0
                        remote: if frame.is_informational() {
149
0
                            tracing::trace!("skipping 1xx response headers");
150
0
                            AwaitingHeaders
151
                        } else {
152
0
                            Streaming
153
                        },
154
                    }
155
                }
156
            }
157
            ReservedRemote => {
158
2
                initial = true;
159
160
2
                if eos {
161
1
                    Closed(Cause::EndStream)
162
1
                } else if frame.is_informational() {
163
0
                    tracing::trace!("skipping 1xx response headers");
164
0
                    ReservedRemote
165
                } else {
166
1
                    HalfClosedLocal(Streaming)
167
                }
168
            }
169
            Open {
170
0
                local,
171
                remote: AwaitingHeaders,
172
            } => {
173
0
                if eos {
174
0
                    HalfClosedRemote(local)
175
                } else {
176
                    Open {
177
0
                        local,
178
0
                        remote: if frame.is_informational() {
179
0
                            tracing::trace!("skipping 1xx response headers");
180
0
                            AwaitingHeaders
181
                        } else {
182
0
                            Streaming
183
                        },
184
                    }
185
                }
186
            }
187
            HalfClosedLocal(AwaitingHeaders) => {
188
2.86k
                if eos {
189
259
                    Closed(Cause::EndStream)
190
2.60k
                } else if frame.is_informational() {
191
404
                    tracing::trace!("skipping 1xx response headers");
192
404
                    HalfClosedLocal(AwaitingHeaders)
193
                } else {
194
2.20k
                    HalfClosedLocal(Streaming)
195
                }
196
            }
197
0
            ref state => {
198
                // All other transitions result in a protocol error
199
0
                proto_err!(conn: "recv_open: in unexpected state {:?}", state);
200
0
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
201
            }
202
        };
203
204
2.87k
        Ok(initial)
205
2.87k
    }
206
207
    /// Transition from Idle -> ReservedRemote
208
1.15k
    pub fn reserve_remote(&mut self) -> Result<(), Error> {
209
1.15k
        match self.inner {
210
            Idle => {
211
1.15k
                self.inner = ReservedRemote;
212
1.15k
                Ok(())
213
            }
214
0
            ref state => {
215
0
                proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
216
0
                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
217
            }
218
        }
219
1.15k
    }
220
221
    /// Transition from Idle -> ReservedLocal
222
0
    pub fn reserve_local(&mut self) -> Result<(), UserError> {
223
0
        match self.inner {
224
            Idle => {
225
0
                self.inner = ReservedLocal;
226
0
                Ok(())
227
            }
228
0
            _ => Err(UserError::UnexpectedFrameType),
229
        }
230
0
    }
231
232
    /// Indicates that the remote side will not send more data to the local.
233
125
    pub fn recv_close(&mut self) -> Result<(), Error> {
234
125
        match self.inner {
235
0
            Open { local, .. } => {
236
                // The remote side will continue to receive data.
237
0
                tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
238
0
                self.inner = HalfClosedRemote(local);
239
0
                Ok(())
240
            }
241
            HalfClosedLocal(..) => {
242
120
                tracing::trace!("recv_close: HalfClosedLocal => Closed");
243
120
                self.inner = Closed(Cause::EndStream);
244
120
                Ok(())
245
            }
246
5
            ref state => {
247
5
                proto_err!(conn: "recv_close: in unexpected state {:?}", state);
248
5
                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
249
            }
250
        }
251
125
    }
252
253
    /// The remote explicitly sent a RST_STREAM.
254
    ///
255
    /// # Arguments
256
    /// - `frame`: the received RST_STREAM frame.
257
    /// - `queued`: true if this stream has frames in the pending send queue.
258
832
    pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
259
640
        match self.inner {
260
            // If the stream is already in a `Closed` state, do nothing,
261
            // provided that there are no frames still in the send queue.
262
201
            Closed(..) if !queued => {}
263
            // A notionally `Closed` stream may still have queued frames in
264
            // the following cases:
265
            //
266
            // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
267
            //   actually closed the stream yet).
268
            // - if the cause is `Cause::EndStream`: we transition to this
269
            //   state when an EOS frame is *enqueued* (so that it's invalid
270
            //   to enqueue more frames), not when the EOS frame is *sent*;
271
            //   therefore, there may still be frames ahead of the EOS frame
272
            //   in the send queue.
273
            //
274
            // In either of these cases, we want to overwrite the stream's
275
            // previous state with the received RST_STREAM, so that the queue
276
            // will be cleared by `Prioritize::pop_frame`.
277
631
            ref state => {
278
631
                tracing::trace!(
279
0
                    "recv_reset; frame={:?}; state={:?}; queued={:?}",
280
                    frame,
281
                    state,
282
                    queued
283
                );
284
631
                self.inner = Closed(Cause::Error(Error::remote_reset(
285
631
                    frame.stream_id(),
286
631
                    frame.reason(),
287
631
                )));
288
            }
289
        }
290
832
    }
291
292
    /// Handle a connection-level error.
293
468k
    pub fn handle_error(&mut self, err: &proto::Error) {
294
468k
        match self.inner {
295
254k
            Closed(..) => {}
296
            _ => {
297
214k
                tracing::trace!("handle_error; err={:?}", err);
298
214k
                self.inner = Closed(Cause::Error(err.clone()));
299
            }
300
        }
301
468k
    }
302
303
282k
    pub fn recv_eof(&mut self) {
304
282k
        match self.inner {
305
104k
            Closed(..) => {}
306
177k
            ref state => {
307
177k
                tracing::trace!("recv_eof; state={:?}", state);
308
177k
                self.inner = Closed(Cause::Error(
309
177k
                    io::Error::new(
310
177k
                        io::ErrorKind::BrokenPipe,
311
177k
                        "stream closed because of a broken pipe",
312
177k
                    )
313
177k
                    .into(),
314
177k
                ));
315
            }
316
        }
317
282k
    }
318
319
    /// Indicates that the local side will not send more data to the local.
320
476k
    pub fn send_close(&mut self) {
321
476k
        match self.inner {
322
476k
            Open { remote, .. } => {
323
                // The remote side will continue to receive data.
324
476k
                tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
325
476k
                self.inner = HalfClosedLocal(remote);
326
            }
327
            HalfClosedRemote(..) => {
328
0
                tracing::trace!("send_close: HalfClosedRemote => Closed");
329
0
                self.inner = Closed(Cause::EndStream);
330
            }
331
0
            ref state => panic!("send_close: unexpected state {:?}", state),
332
        }
333
476k
    }
334
335
    /// Set the stream state to reset locally.
336
136k
    pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
337
136k
        self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
338
136k
    }
339
340
    /// Set the stream state to a scheduled reset.
341
84.0k
    pub fn set_scheduled_reset(&mut self, reason: Reason) {
342
84.0k
        debug_assert!(!self.is_closed());
343
84.0k
        self.inner = Closed(Cause::ScheduledLibraryReset(reason));
344
84.0k
    }
345
346
237k
    pub fn get_scheduled_reset(&self) -> Option<Reason> {
347
237k
        match self.inner {
348
75.2k
            Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
349
162k
            _ => None,
350
        }
351
237k
    }
352
353
1.75M
    pub fn is_scheduled_reset(&self) -> bool {
354
1.73M
        matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
355
1.75M
    }
356
357
196k
    pub fn is_local_error(&self) -> bool {
358
188k
        match self.inner {
359
104k
            Closed(Cause::Error(ref e)) => e.is_local(),
360
84.5k
            Closed(Cause::ScheduledLibraryReset(..)) => true,
361
7.55k
            _ => false,
362
        }
363
196k
    }
364
365
0
    pub fn is_remote_reset(&self) -> bool {
366
0
        matches!(
367
0
            self.inner,
368
            Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
369
        )
370
0
    }
371
372
    /// Returns true if the stream is already reset.
373
84.7k
    pub fn is_reset(&self) -> bool {
374
23.9k
        match self.inner {
375
61
            Closed(Cause::EndStream) => false,
376
23.9k
            Closed(_) => true,
377
60.7k
            _ => false,
378
        }
379
84.7k
    }
380
381
689k
    pub fn is_send_streaming(&self) -> bool {
382
213k
        matches!(
383
476k
            self.inner,
384
            Open {
385
                local: Streaming,
386
                ..
387
            } | HalfClosedRemote(Streaming)
388
        )
389
689k
    }
390
391
    /// Returns true when the stream is in a state to receive headers
392
3.92k
    pub fn is_recv_headers(&self) -> bool {
393
1.05k
        matches!(
394
0
            self.inner,
395
            Idle | Open {
396
                remote: AwaitingHeaders,
397
                ..
398
            } | HalfClosedLocal(AwaitingHeaders)
399
                | ReservedRemote
400
        )
401
3.92k
    }
402
403
4.33k
    pub fn is_recv_streaming(&self) -> bool {
404
29
        matches!(
405
0
            self.inner,
406
            Open {
407
                remote: Streaming,
408
                ..
409
            } | HalfClosedLocal(Streaming)
410
        )
411
4.33k
    }
412
413
0
    pub fn is_recv_end_stream(&self) -> bool {
414
        // In either case END_STREAM has been received
415
0
        matches!(self.inner, Closed(Cause::EndStream) | HalfClosedRemote(..))
416
0
    }
417
418
7.46M
    pub fn is_closed(&self) -> bool {
419
7.46M
        matches!(self.inner, Closed(_))
420
7.46M
    }
421
422
234k
    pub fn is_send_closed(&self) -> bool {
423
0
        matches!(
424
234k
            self.inner,
425
            Closed(..) | HalfClosedLocal(..) | ReservedRemote
426
        )
427
234k
    }
428
429
0
    pub fn is_idle(&self) -> bool {
430
0
        matches!(self.inner, Idle)
431
0
    }
432
433
515k
    pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
434
        // TODO: Is this correct?
435
39.8k
        match self.inner {
436
39.8k
            Closed(Cause::Error(ref e)) => Err(e.clone()),
437
1
            Closed(Cause::ScheduledLibraryReset(reason)) => {
438
1
                Err(proto::Error::library_go_away(reason))
439
            }
440
15
            Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
441
475k
            _ => Ok(true),
442
        }
443
515k
    }
444
445
    /// Returns a reason if the stream has been reset.
446
0
    pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
447
0
        match self.inner {
448
0
            Closed(Cause::Error(Error::Reset(_, reason, _)))
449
0
            | Closed(Cause::Error(Error::GoAway(_, reason, _)))
450
0
            | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
451
0
            Closed(Cause::Error(ref e)) => Err(e.clone().into()),
452
            Open {
453
                local: Streaming, ..
454
            }
455
0
            | HalfClosedRemote(Streaming) => match mode {
456
0
                PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
457
0
                PollReset::Streaming => Ok(None),
458
            },
459
0
            _ => Ok(None),
460
        }
461
0
    }
462
}
463
464
impl Default for State {
465
537k
    fn default() -> State {
466
537k
        State { inner: Inner::Idle }
467
537k
    }
468
}
469
470
// remove some noise for debug output
471
impl fmt::Debug for State {
472
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473
0
        self.inner.fmt(f)
474
0
    }
475
}