Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/proto/streams/stream.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::Reason;
2
3
use super::*;
4
5
use std::fmt;
6
use std::task::{Context, Waker};
7
use std::time::Instant;
8
9
/// Tracks Stream related state
10
///
11
/// # Reference counting
12
///
13
/// There can be a number of outstanding handles to a single Stream. These are
14
/// tracked using reference counting. The `ref_count` field represents the
15
/// number of outstanding userspace handles that can reach this stream.
16
///
17
/// It's important to note that when the stream is placed in an internal queue
18
/// (such as an accept queue), this is **not** tracked by a reference count.
19
/// Thus, `ref_count` can be zero and the stream still has to be kept around.
20
pub(super) struct Stream {
21
    /// The h2 stream identifier
22
    pub id: StreamId,
23
24
    /// Current state of the stream
25
    pub state: State,
26
27
    /// Set to `true` when the stream is counted against the connection's max
28
    /// concurrent streams.
29
    pub is_counted: bool,
30
31
    /// Number of outstanding handles pointing to this stream
32
    pub ref_count: usize,
33
34
    // ===== Fields related to sending =====
35
    /// Next node in the accept linked list
36
    pub next_pending_send: Option<store::Key>,
37
38
    /// Set to true when the stream is pending accept
39
    pub is_pending_send: bool,
40
41
    /// Send data flow control
42
    pub send_flow: FlowControl,
43
44
    /// Amount of send capacity that has been requested, but not yet allocated.
45
    pub requested_send_capacity: WindowSize,
46
47
    /// Amount of data buffered at the prioritization layer.
48
    /// TODO: Technically this could be greater than the window size...
49
    pub buffered_send_data: usize,
50
51
    /// Task tracking additional send capacity (i.e. window updates).
52
    send_task: Option<Waker>,
53
54
    /// Frames pending for this stream being sent to the socket
55
    pub pending_send: buffer::Deque,
56
57
    /// Next node in the linked list of streams waiting for additional
58
    /// connection level capacity.
59
    pub next_pending_send_capacity: Option<store::Key>,
60
61
    /// True if the stream is waiting for outbound connection capacity
62
    pub is_pending_send_capacity: bool,
63
64
    /// Set to true when the send capacity has been incremented
65
    pub send_capacity_inc: bool,
66
67
    /// Next node in the open linked list
68
    pub next_open: Option<store::Key>,
69
70
    /// Set to true when the stream is pending to be opened
71
    pub is_pending_open: bool,
72
73
    /// Set to true when a push is pending for this stream
74
    pub is_pending_push: bool,
75
76
    // ===== Fields related to receiving =====
77
    /// Next node in the accept linked list
78
    pub next_pending_accept: Option<store::Key>,
79
80
    /// Set to true when the stream is pending accept
81
    pub is_pending_accept: bool,
82
83
    /// Receive data flow control
84
    pub recv_flow: FlowControl,
85
86
    pub in_flight_recv_data: WindowSize,
87
88
    /// Next node in the linked list of streams waiting to send window updates.
89
    pub next_window_update: Option<store::Key>,
90
91
    /// True if the stream is waiting to send a window update
92
    pub is_pending_window_update: bool,
93
94
    /// The time when this stream may have been locally reset.
95
    pub reset_at: Option<Instant>,
96
97
    /// Next node in list of reset streams that should expire eventually
98
    pub next_reset_expire: Option<store::Key>,
99
100
    /// Frames pending for this stream to read
101
    pub pending_recv: buffer::Deque,
102
103
    /// When the RecvStream drop occurs, no data should be received.
104
    pub is_recv: bool,
105
106
    /// Task tracking receiving frames
107
    pub recv_task: Option<Waker>,
108
109
    /// Task tracking pushed promises.
110
    pub push_task: Option<Waker>,
111
112
    /// The stream's pending push promises
113
    pub pending_push_promises: store::Queue<NextAccept>,
114
115
    /// Validate content-length headers
116
    pub content_length: ContentLength,
117
}
118
119
/// State related to validating a stream's content-length
120
#[derive(Debug)]
121
pub enum ContentLength {
122
    Omitted,
123
    Head,
124
    Remaining(u64),
125
}
126
127
#[derive(Debug)]
128
pub(super) struct NextAccept;
129
130
#[derive(Debug)]
131
pub(super) struct NextSend;
132
133
#[derive(Debug)]
134
pub(super) struct NextSendCapacity;
135
136
#[derive(Debug)]
137
pub(super) struct NextWindowUpdate;
138
139
#[derive(Debug)]
140
pub(super) struct NextOpen;
141
142
#[derive(Debug)]
143
pub(super) struct NextResetExpire;
144
145
impl Stream {
146
450k
    pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
147
450k
        let mut send_flow = FlowControl::new();
148
450k
        let mut recv_flow = FlowControl::new();
149
450k
150
450k
        recv_flow
151
450k
            .inc_window(init_recv_window)
152
450k
            .expect("invalid initial receive window");
153
450k
        // TODO: proper error handling?
154
450k
        let _res = recv_flow.assign_capacity(init_recv_window);
155
450k
        debug_assert!(_res.is_ok());
156
157
450k
        send_flow
158
450k
            .inc_window(init_send_window)
159
450k
            .expect("invalid initial send window size");
160
450k
161
450k
        Stream {
162
450k
            id,
163
450k
            state: State::default(),
164
450k
            ref_count: 0,
165
450k
            is_counted: false,
166
450k
167
450k
            // ===== Fields related to sending =====
168
450k
            next_pending_send: None,
169
450k
            is_pending_send: false,
170
450k
            send_flow,
171
450k
            requested_send_capacity: 0,
172
450k
            buffered_send_data: 0,
173
450k
            send_task: None,
174
450k
            pending_send: buffer::Deque::new(),
175
450k
            is_pending_send_capacity: false,
176
450k
            next_pending_send_capacity: None,
177
450k
            send_capacity_inc: false,
178
450k
            is_pending_open: false,
179
450k
            next_open: None,
180
450k
            is_pending_push: false,
181
450k
182
450k
            // ===== Fields related to receiving =====
183
450k
            next_pending_accept: None,
184
450k
            is_pending_accept: false,
185
450k
            recv_flow,
186
450k
            in_flight_recv_data: 0,
187
450k
            next_window_update: None,
188
450k
            is_pending_window_update: false,
189
450k
            reset_at: None,
190
450k
            next_reset_expire: None,
191
450k
            pending_recv: buffer::Deque::new(),
192
450k
            is_recv: true,
193
450k
            recv_task: None,
194
450k
            push_task: None,
195
450k
            pending_push_promises: store::Queue::new(),
196
450k
            content_length: ContentLength::Omitted,
197
450k
        }
198
450k
    }
199
200
    /// Increment the stream's ref count
201
853k
    pub fn ref_inc(&mut self) {
202
853k
        assert!(self.ref_count < usize::MAX);
203
853k
        self.ref_count += 1;
204
853k
    }
205
206
    /// Decrements the stream's ref count
207
853k
    pub fn ref_dec(&mut self) {
208
853k
        assert!(self.ref_count > 0);
209
853k
        self.ref_count -= 1;
210
853k
    }
211
212
    /// Returns true if stream is currently being held for some time because of
213
    /// a local reset.
214
4.19M
    pub fn is_pending_reset_expiration(&self) -> bool {
215
4.19M
        self.reset_at.is_some()
216
4.19M
    }
217
218
    /// Returns true if frames for this stream are ready to be sent over the wire
219
888k
    pub fn is_send_ready(&self) -> bool {
220
888k
        // Why do we check pending_open?
221
888k
        //
222
888k
        // We allow users to call send_request() which schedules a stream to be pending_open
223
888k
        // if there is no room according to the concurrency limit (max_send_streams), and we
224
888k
        // also allow data to be buffered for send with send_data() if there is no capacity for
225
888k
        // the stream to send the data, which attempts to place the stream in pending_send.
226
888k
        // If the stream is not open, we don't want the stream to be scheduled for
227
888k
        // execution (pending_send). Note that if the stream is in pending_open, it will be
228
888k
        // pushed to pending_send when there is room for an open stream.
229
888k
        //
230
888k
        // In pending_push we track whether a PushPromise still needs to be sent
231
888k
        // from a different stream before we can start sending frames on this one.
232
888k
        // This is different from the "open" check because reserved streams don't count
233
888k
        // toward the concurrency limit.
234
888k
        // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
235
888k
        !self.is_pending_open && !self.is_pending_push
236
888k
    }
237
238
    /// Returns true if the stream is closed
239
5.79M
    pub fn is_closed(&self) -> bool {
240
5.79M
        // The state has fully transitioned to closed.
241
5.79M
        self.state.is_closed() &&
242
            // Because outbound frames transition the stream state before being
243
            // buffered, we have to ensure that all frames have been flushed.
244
3.46M
            self.pending_send.is_empty() &&
245
            // Sometimes large data frames are sent out in chunks. After a chunk
246
            // of the frame is sent, the remainder is pushed back onto the send
247
            // queue to be rescheduled.
248
            //
249
            // Checking for additional buffered data lets us catch this case.
250
3.15M
            self.buffered_send_data == 0
251
5.79M
    }
252
253
    /// Returns true if the stream is no longer in use
254
2.68M
    pub fn is_released(&self) -> bool {
255
2.68M
        // The stream is closed and fully flushed
256
2.68M
        self.is_closed() &&
257
            // There are no more outstanding references to the stream
258
1.40M
            self.ref_count == 0 &&
259
            // The stream is not in any queue
260
888k
            !self.is_pending_send && !self.is_pending_send_capacity &&
261
589k
            !self.is_pending_accept && !self.is_pending_window_update &&
262
588k
            !self.is_pending_open && self.reset_at.is_none()
263
2.68M
    }
264
265
    /// Returns true when the consumer of the stream has dropped all handles
266
    /// (indicating no further interest in the stream) and the stream state is
267
    /// not actually closed.
268
    ///
269
    /// In this case, a reset should be sent.
270
853k
    pub fn is_canceled_interest(&self) -> bool {
271
853k
        self.ref_count == 0 && !self.state.is_closed()
272
853k
    }
273
274
    /// Current available stream send capacity
275
207k
    pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
276
207k
        let available = self.send_flow.available().as_size() as usize;
277
207k
        let buffered = self.buffered_send_data;
278
207k
279
207k
        available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
280
207k
    }
281
282
84.4k
    pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
283
84.4k
        let prev_capacity = self.capacity(max_buffer_size);
284
84.4k
        debug_assert!(capacity > 0);
285
        // TODO: proper error handling
286
84.4k
        let _res = self.send_flow.assign_capacity(capacity);
287
84.4k
        debug_assert!(_res.is_ok());
288
289
84.4k
        tracing::trace!(
290
0
            "  assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
291
0
            self.send_flow.available(),
292
            self.buffered_send_data,
293
            self.id,
294
            max_buffer_size,
295
            prev_capacity,
296
        );
297
298
84.4k
        if prev_capacity < self.capacity(max_buffer_size) {
299
0
            self.notify_capacity();
300
84.4k
        }
301
84.4k
    }
302
303
19.1k
    pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
304
19.1k
        let prev_capacity = self.capacity(max_buffer_size);
305
19.1k
306
19.1k
        // TODO: proper error handling
307
19.1k
        let _res = self.send_flow.send_data(len);
308
19.1k
        debug_assert!(_res.is_ok());
309
310
        // Decrement the stream's buffered data counter
311
19.1k
        debug_assert!(self.buffered_send_data >= len as usize);
312
19.1k
        self.buffered_send_data -= len as usize;
313
19.1k
        self.requested_send_capacity -= len;
314
19.1k
315
19.1k
        tracing::trace!(
316
0
            "  sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
317
0
            self.send_flow.available(),
318
            self.buffered_send_data,
319
            self.id,
320
            max_buffer_size,
321
            prev_capacity,
322
        );
323
324
19.1k
        if prev_capacity < self.capacity(max_buffer_size) {
325
0
            self.notify_capacity();
326
19.1k
        }
327
19.1k
    }
328
329
    /// If the capacity was limited because of the max_send_buffer_size,
330
    /// then consider waking the send task again...
331
0
    pub fn notify_capacity(&mut self) {
332
0
        self.send_capacity_inc = true;
333
0
        tracing::trace!("  notifying task");
334
0
        self.notify_send();
335
0
    }
336
337
    /// Returns `Err` when the decrement cannot be completed due to overflow.
338
2.57k
    pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
339
2.57k
        match self.content_length {
340
1.77k
            ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
341
1.77k
                Some(val) => *rem = val,
342
2
                None => return Err(()),
343
            },
344
            ContentLength::Head => {
345
0
                if len != 0 {
346
0
                    return Err(());
347
0
                }
348
            }
349
802
            _ => {}
350
        }
351
352
2.57k
        Ok(())
353
2.57k
    }
354
355
109
    pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
356
109
        match self.content_length {
357
2
            ContentLength::Remaining(0) => Ok(()),
358
40
            ContentLength::Remaining(_) => Err(()),
359
67
            _ => Ok(()),
360
        }
361
109
    }
362
363
737k
    pub fn notify_send(&mut self) {
364
737k
        if let Some(task) = self.send_task.take() {
365
255
            task.wake();
366
737k
        }
367
737k
    }
368
369
15.5k
    pub fn wait_send(&mut self, cx: &Context) {
370
15.5k
        self.send_task = Some(cx.waker().clone());
371
15.5k
    }
372
373
700k
    pub fn notify_recv(&mut self) {
374
700k
        if let Some(task) = self.recv_task.take() {
375
424k
            task.wake();
376
424k
        }
377
700k
    }
378
379
651k
    pub(super) fn notify_push(&mut self) {
380
651k
        if let Some(task) = self.push_task.take() {
381
0
            task.wake();
382
651k
        }
383
651k
    }
384
385
    /// Set the stream's state to `Closed` with the given reason and initiator.
386
    /// Notify the send and receive tasks, if they exist.
387
98.7k
    pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388
98.7k
        self.state.set_reset(self.id, reason, initiator);
389
98.7k
        self.notify_push();
390
98.7k
        self.notify_recv();
391
98.7k
    }
392
}
393
394
impl fmt::Debug for Stream {
395
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396
0
        f.debug_struct("Stream")
397
0
            .field("id", &self.id)
398
0
            .field("state", &self.state)
399
0
            .field("is_counted", &self.is_counted)
400
0
            .field("ref_count", &self.ref_count)
401
0
            .h2_field_some("next_pending_send", &self.next_pending_send)
402
0
            .h2_field_if("is_pending_send", &self.is_pending_send)
403
0
            .field("send_flow", &self.send_flow)
404
0
            .field("requested_send_capacity", &self.requested_send_capacity)
405
0
            .field("buffered_send_data", &self.buffered_send_data)
406
0
            .h2_field_some("send_task", &self.send_task.as_ref().map(|_| ()))
407
0
            .h2_field_if_then(
408
0
                "pending_send",
409
0
                !self.pending_send.is_empty(),
410
0
                &self.pending_send,
411
0
            )
412
0
            .h2_field_some(
413
0
                "next_pending_send_capacity",
414
0
                &self.next_pending_send_capacity,
415
0
            )
416
0
            .h2_field_if("is_pending_send_capacity", &self.is_pending_send_capacity)
417
0
            .h2_field_if("send_capacity_inc", &self.send_capacity_inc)
418
0
            .h2_field_some("next_open", &self.next_open)
419
0
            .h2_field_if("is_pending_open", &self.is_pending_open)
420
0
            .h2_field_if("is_pending_push", &self.is_pending_push)
421
0
            .h2_field_some("next_pending_accept", &self.next_pending_accept)
422
0
            .h2_field_if("is_pending_accept", &self.is_pending_accept)
423
0
            .field("recv_flow", &self.recv_flow)
424
0
            .field("in_flight_recv_data", &self.in_flight_recv_data)
425
0
            .h2_field_some("next_window_update", &self.next_window_update)
426
0
            .h2_field_if("is_pending_window_update", &self.is_pending_window_update)
427
0
            .h2_field_some("reset_at", &self.reset_at)
428
0
            .h2_field_some("next_reset_expire", &self.next_reset_expire)
429
0
            .h2_field_if_then(
430
0
                "pending_recv",
431
0
                !self.pending_recv.is_empty(),
432
0
                &self.pending_recv,
433
0
            )
434
0
            .h2_field_if("is_recv", &self.is_recv)
435
0
            .h2_field_some("recv_task", &self.recv_task.as_ref().map(|_| ()))
436
0
            .h2_field_some("push_task", &self.push_task.as_ref().map(|_| ()))
437
0
            .h2_field_if_then(
438
0
                "pending_push_promises",
439
0
                !self.pending_push_promises.is_empty(),
440
0
                &self.pending_push_promises,
441
0
            )
442
0
            .field("content_length", &self.content_length)
443
0
            .finish()
444
0
    }
445
}
446
447
impl store::Next for NextAccept {
448
142
    fn next(stream: &Stream) -> Option<store::Key> {
449
142
        stream.next_pending_accept
450
142
    }
451
452
42
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
453
42
        stream.next_pending_accept = key;
454
42
    }
455
456
39
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
457
39
        stream.next_pending_accept.take()
458
39
    }
459
460
190
    fn is_queued(stream: &Stream) -> bool {
461
190
        stream.is_pending_accept
462
190
    }
463
464
371
    fn set_queued(stream: &mut Stream, val: bool) {
465
371
        stream.is_pending_accept = val;
466
371
    }
467
}
468
469
impl store::Next for NextSend {
470
6.49k
    fn next(stream: &Stream) -> Option<store::Key> {
471
6.49k
        stream.next_pending_send
472
6.49k
    }
473
474
410k
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
475
410k
        stream.next_pending_send = key;
476
410k
    }
477
478
410k
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
479
410k
        stream.next_pending_send.take()
480
410k
    }
481
482
728k
    fn is_queued(stream: &Stream) -> bool {
483
728k
        stream.is_pending_send
484
728k
    }
485
486
833k
    fn set_queued(stream: &mut Stream, val: bool) {
487
833k
        if val {
488
            // ensure that stream is not queued for being opened
489
            // if it's being put into queue for sending data
490
416k
            debug_assert!(!stream.is_pending_open);
491
416k
        }
492
833k
        stream.is_pending_send = val;
493
833k
    }
494
}
495
496
impl store::Next for NextSendCapacity {
497
2.13k
    fn next(stream: &Stream) -> Option<store::Key> {
498
2.13k
        stream.next_pending_send_capacity
499
2.13k
    }
500
501
232k
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
502
232k
        stream.next_pending_send_capacity = key;
503
232k
    }
504
505
232k
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
506
232k
        stream.next_pending_send_capacity.take()
507
232k
    }
508
509
292k
    fn is_queued(stream: &Stream) -> bool {
510
292k
        stream.is_pending_send_capacity
511
292k
    }
512
513
468k
    fn set_queued(stream: &mut Stream, val: bool) {
514
468k
        stream.is_pending_send_capacity = val;
515
468k
    }
516
}
517
518
impl store::Next for NextWindowUpdate {
519
0
    fn next(stream: &Stream) -> Option<store::Key> {
520
0
        stream.next_window_update
521
0
    }
522
523
0
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
524
0
        stream.next_window_update = key;
525
0
    }
526
527
0
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
528
0
        stream.next_window_update.take()
529
0
    }
530
531
0
    fn is_queued(stream: &Stream) -> bool {
532
0
        stream.is_pending_window_update
533
0
    }
534
535
0
    fn set_queued(stream: &mut Stream, val: bool) {
536
0
        stream.is_pending_window_update = val;
537
0
    }
538
}
539
540
impl store::Next for NextOpen {
541
4.07k
    fn next(stream: &Stream) -> Option<store::Key> {
542
4.07k
        stream.next_open
543
4.07k
    }
544
545
422k
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
546
422k
        stream.next_open = key;
547
422k
    }
548
549
422k
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
550
422k
        stream.next_open.take()
551
422k
    }
552
553
426k
    fn is_queued(stream: &Stream) -> bool {
554
426k
        stream.is_pending_open
555
426k
    }
556
557
852k
    fn set_queued(stream: &mut Stream, val: bool) {
558
852k
        if val {
559
            // ensure that stream is not queued for being sent
560
            // if it's being put into queue for opening the stream
561
426k
            debug_assert!(!stream.is_pending_send);
562
426k
        }
563
852k
        stream.is_pending_open = val;
564
852k
    }
565
}
566
567
impl store::Next for NextResetExpire {
568
4.57k
    fn next(stream: &Stream) -> Option<store::Key> {
569
4.57k
        stream.next_reset_expire
570
4.57k
    }
571
572
44.3k
    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
573
44.3k
        stream.next_reset_expire = key;
574
44.3k
    }
575
576
44.3k
    fn take_next(stream: &mut Stream) -> Option<store::Key> {
577
44.3k
        stream.next_reset_expire.take()
578
44.3k
    }
579
580
48.8k
    fn is_queued(stream: &Stream) -> bool {
581
48.8k
        stream.reset_at.is_some()
582
48.8k
    }
583
584
97.7k
    fn set_queued(stream: &mut Stream, val: bool) {
585
97.7k
        if val {
586
48.8k
            stream.reset_at = Some(Instant::now());
587
48.8k
        } else {
588
48.8k
            stream.reset_at = None;
589
48.8k
        }
590
97.7k
    }
591
}
592
593
// ===== impl ContentLength =====
594
595
impl ContentLength {
596
1.56k
    pub fn is_head(&self) -> bool {
597
1.56k
        matches!(*self, Self::Head)
598
1.56k
    }
599
}