Coverage Report

Created: 2026-01-17 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/counts.rs
Line
Count
Source
1
use super::*;
2
3
#[derive(Debug)]
4
pub(super) struct Counts {
5
    /// Acting as a client or server. This allows us to track which values to
6
    /// inc / dec.
7
    peer: peer::Dyn,
8
9
    /// Maximum number of locally initiated streams
10
    max_send_streams: usize,
11
12
    /// Current number of remote initiated streams
13
    num_send_streams: usize,
14
15
    /// Maximum number of remote initiated streams
16
    max_recv_streams: usize,
17
18
    /// Current number of locally initiated streams
19
    num_recv_streams: usize,
20
21
    /// Maximum number of pending locally reset streams
22
    max_local_reset_streams: usize,
23
24
    /// Current number of pending locally reset streams
25
    num_local_reset_streams: usize,
26
27
    /// Max number of "pending accept" streams that were remotely reset
28
    max_remote_reset_streams: usize,
29
30
    /// Current number of "pending accept" streams that were remotely reset
31
    num_remote_reset_streams: usize,
32
33
    /// Maximum number of locally reset streams due to protocol error across
34
    /// the lifetime of the connection.
35
    ///
36
    /// When this gets exceeded, we issue GOAWAYs.
37
    max_local_error_reset_streams: Option<usize>,
38
39
    /// Total number of locally reset streams due to protocol error across the
40
    /// lifetime of the connection.
41
    num_local_error_reset_streams: usize,
42
}
43
44
impl Counts {
45
    /// Create a new `Counts` using the provided configuration values.
46
13.3k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47
13.3k
        Counts {
48
13.3k
            peer,
49
13.3k
            max_send_streams: config.initial_max_send_streams,
50
13.3k
            num_send_streams: 0,
51
13.3k
            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52
13.3k
            num_recv_streams: 0,
53
13.3k
            max_local_reset_streams: config.local_reset_max,
54
13.3k
            num_local_reset_streams: 0,
55
13.3k
            max_remote_reset_streams: config.remote_reset_max,
56
13.3k
            num_remote_reset_streams: 0,
57
13.3k
            max_local_error_reset_streams: config.local_max_error_reset_streams,
58
13.3k
            num_local_error_reset_streams: 0,
59
13.3k
        }
60
13.3k
    }
61
62
    /// Returns true when the next opened stream will reach capacity of outbound streams
63
    ///
64
    /// The number of client send streams is incremented in prioritize; send_request has to guess if
65
    /// it should wait before allowing another request to be sent.
66
419k
    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67
419k
        self.max_send_streams <= (self.num_send_streams + 1)
68
419k
    }
69
70
    /// Returns the current peer
71
972k
    pub fn peer(&self) -> peer::Dyn {
72
972k
        self.peer
73
972k
    }
74
75
3.03M
    pub fn has_streams(&self) -> bool {
76
3.03M
        self.num_send_streams != 0 || self.num_recv_streams != 0
77
3.03M
    }
78
79
    /// Returns true if we can issue another local reset due to protocol error.
80
155k
    pub fn can_inc_num_local_error_resets(&self) -> bool {
81
155k
        if let Some(max) = self.max_local_error_reset_streams {
82
155k
            max > self.num_local_error_reset_streams
83
        } else {
84
0
            true
85
        }
86
155k
    }
87
88
77.6k
    pub fn inc_num_local_error_resets(&mut self) {
89
77.6k
        assert!(self.can_inc_num_local_error_resets());
90
91
        // Increment the number of remote initiated streams
92
77.6k
        self.num_local_error_reset_streams += 1;
93
77.6k
    }
94
95
0
    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96
0
        self.max_local_error_reset_streams
97
0
    }
98
99
    /// Returns true if the receive stream concurrency can be incremented
100
1.10k
    pub fn can_inc_num_recv_streams(&self) -> bool {
101
1.10k
        self.max_recv_streams > self.num_recv_streams
102
1.10k
    }
103
104
    /// Increments the number of concurrent receive streams.
105
    ///
106
    /// # Panics
107
    ///
108
    /// Panics on failure as this should have been validated before hand.
109
2
    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110
2
        assert!(self.can_inc_num_recv_streams());
111
2
        assert!(!stream.is_counted);
112
113
        // Increment the number of remote initiated streams
114
2
        self.num_recv_streams += 1;
115
2
        stream.is_counted = true;
116
2
    }
117
118
    /// Returns true if the send stream concurrency can be incremented
119
968k
    pub fn can_inc_num_send_streams(&self) -> bool {
120
968k
        self.max_send_streams > self.num_send_streams
121
968k
    }
122
123
    /// Increments the number of concurrent send streams.
124
    ///
125
    /// # Panics
126
    ///
127
    /// Panics on failure as this should have been validated before hand.
128
176k
    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129
176k
        assert!(self.can_inc_num_send_streams());
130
176k
        assert!(!stream.is_counted);
131
132
        // Increment the number of remote initiated streams
133
176k
        self.num_send_streams += 1;
134
176k
        stream.is_counted = true;
135
176k
    }
136
137
    /// Returns true if the number of pending reset streams can be incremented.
138
177k
    pub fn can_inc_num_reset_streams(&self) -> bool {
139
177k
        self.max_local_reset_streams > self.num_local_reset_streams
140
177k
    }
141
142
    /// Increments the number of pending reset streams.
143
    ///
144
    /// # Panics
145
    ///
146
    /// Panics on failure as this should have been validated before hand.
147
48.1k
    pub fn inc_num_reset_streams(&mut self) {
148
48.1k
        assert!(self.can_inc_num_reset_streams());
149
150
48.1k
        self.num_local_reset_streams += 1;
151
48.1k
    }
152
153
0
    pub(crate) fn max_remote_reset_streams(&self) -> usize {
154
0
        self.max_remote_reset_streams
155
0
    }
156
157
    /// Returns true if the number of pending REMOTE reset streams can be
158
    /// incremented.
159
0
    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160
0
        self.max_remote_reset_streams > self.num_remote_reset_streams
161
0
    }
162
163
    /// Increments the number of pending REMOTE reset streams.
164
    ///
165
    /// # Panics
166
    ///
167
    /// Panics on failure as this should have been validated before hand.
168
0
    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169
0
        assert!(self.can_inc_num_remote_reset_streams());
170
171
0
        self.num_remote_reset_streams += 1;
172
0
    }
173
174
0
    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175
0
        assert!(self.num_remote_reset_streams > 0);
176
177
0
        self.num_remote_reset_streams -= 1;
178
0
    }
179
180
6.71k
    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181
6.71k
        match settings.max_concurrent_streams() {
182
727
            Some(val) => self.max_send_streams = val as usize,
183
859
            None if is_initial => self.max_send_streams = usize::MAX,
184
5.12k
            None => {}
185
        }
186
6.71k
    }
187
188
    /// Run a block of code that could potentially transition a stream's state.
189
    ///
190
    /// If the stream state transitions to closed, this function will perform
191
    /// all necessary cleanup.
192
    ///
193
    /// TODO: Is this function still needed?
194
2.09M
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.09M
    where
196
2.09M
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.09M
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.09M
        let ret = f(self, &mut stream);
203
204
2.09M
        self.transition_after(stream, is_pending_reset);
205
206
2.09M
        ret
207
2.09M
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Ptr>::{closure#0}, ()>
Line
Count
Source
194
27.0k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
27.0k
    where
196
27.0k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
27.0k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
27.0k
        let ret = f(self, &mut stream);
203
204
27.0k
        self.transition_after(stream, is_pending_reset);
205
206
27.0k
        ret
207
27.0k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Store>::{closure#0}, ()>
Line
Count
Source
194
34.5k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
34.5k
    where
196
34.5k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
34.5k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
34.5k
        let ret = f(self, &mut stream);
203
204
34.5k
        self.transition_after(stream, is_pending_reset);
205
206
34.5k
        ret
207
34.5k
    }
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
184
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
184
    where
196
184
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
184
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
184
        let ret = f(self, &mut stream);
203
204
184
        self.transition_after(stream, is_pending_reset);
205
206
184
        ret
207
184
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::clear_pending_capacity::{closure#0}, ()>
Line
Count
Source
194
12.7k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
12.7k
    where
196
12.7k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
12.7k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
12.7k
        let ret = f(self, &mut stream);
203
204
12.7k
        self.transition_after(stream, is_pending_reset);
205
206
12.7k
        ret
207
12.7k
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::clear_stream_window_update_queue::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}, ()>
Line
Count
Source
194
839k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
839k
    where
196
839k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
839k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
839k
        let ret = f(self, &mut stream);
203
204
839k
        self.transition_after(stream, is_pending_reset);
205
206
839k
        ret
207
839k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
614
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
614
    where
196
614
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
614
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
614
        let ret = f(self, &mut stream);
203
204
614
        self.transition_after(stream, is_pending_reset);
205
206
614
        ret
207
614
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::send_stream_window_updates<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
663
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
663
    where
196
663
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
663
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
663
        let ret = f(self, &mut stream);
203
204
663
        self.transition_after(stream, is_pending_reset);
205
206
663
        ret
207
663
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_headers<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
2.67k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.67k
    where
196
2.67k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.67k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.67k
        let ret = f(self, &mut stream);
203
204
2.67k
        self.transition_after(stream, is_pending_reset);
205
206
2.67k
        ret
207
2.67k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_push_promise<bytes::bytes::Bytes>::{closure#0}, core::result::Result<core::option::Option<h2::proto::streams::store::Key>, h2::proto::error::Error>>
Line
Count
Source
194
1.10k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
1.10k
    where
196
1.10k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
1.10k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
1.10k
        let ret = f(self, &mut stream);
203
204
1.10k
        self.transition_after(stream, is_pending_reset);
205
206
1.10k
        ret
207
1.10k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_data<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
17.5k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
17.5k
    where
196
17.5k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
17.5k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
17.5k
        let ret = f(self, &mut stream);
203
204
17.5k
        self.transition_after(stream, is_pending_reset);
205
206
17.5k
        ret
207
17.5k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Actions>::send_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::GoAway>>
Line
Count
Source
194
76.6k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
76.6k
    where
196
76.6k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
76.6k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
76.6k
        let ret = f(self, &mut stream);
203
204
76.6k
        self.transition_after(stream, is_pending_reset);
205
206
76.6k
        ret
207
76.6k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
169k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
169k
    where
196
169k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
169k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
169k
        let ret = f(self, &mut stream);
203
204
169k
        self.transition_after(stream, is_pending_reset);
205
206
169k
        ret
207
169k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
234k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
234k
    where
196
234k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
234k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
234k
        let ret = f(self, &mut stream);
203
204
234k
        self.transition_after(stream, is_pending_reset);
205
206
234k
        ret
207
234k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
260k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
260k
    where
196
260k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
260k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
260k
        let ret = f(self, &mut stream);
203
204
260k
        self.transition_after(stream, is_pending_reset);
205
206
260k
        ret
207
260k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::send_data::{closure#0}, core::result::Result<(), h2::codec::error::UserError>>
Line
Count
Source
194
418k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
418k
    where
196
418k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
418k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
418k
        let ret = f(self, &mut stream);
203
204
418k
        self.transition_after(stream, is_pending_reset);
205
206
418k
        ret
207
418k
    }
208
209
    // TODO: move this to macro?
210
2.79M
    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211
2.79M
        tracing::trace!(
212
0
            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213
0
             pending_send_empty={:?}; buffered_send_data={}; \
214
0
             num_recv={}; num_send={}",
215
0
            stream.id,
216
0
            stream.state,
217
0
            stream.is_closed(),
218
0
            stream.pending_send.is_empty(),
219
0
            stream.buffered_send_data,
220
            self.num_recv_streams,
221
            self.num_send_streams
222
        );
223
224
2.79M
        if stream.is_closed() {
225
1.53M
            if !stream.is_pending_reset_expiration() {
226
1.27M
                stream.unlink();
227
1.27M
                if is_reset_counted {
228
48.1k
                    self.dec_num_reset_streams();
229
1.22M
                }
230
255k
            }
231
232
1.53M
            if !stream.state.is_scheduled_reset() && stream.is_counted {
233
176k
                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234
                // Decrement the number of active streams.
235
176k
                self.dec_num_streams(&mut stream);
236
1.35M
            }
237
1.25M
        }
238
239
        // Release the stream if it requires releasing
240
2.79M
        if stream.is_released() {
241
472k
            stream.remove();
242
2.31M
        }
243
2.79M
    }
244
245
    /// Returns the maximum number of streams that can be initiated by this
246
    /// peer.
247
0
    pub(crate) fn max_send_streams(&self) -> usize {
248
0
        self.max_send_streams
249
0
    }
250
251
    /// Returns the maximum number of streams that can be initiated by the
252
    /// remote peer.
253
0
    pub(crate) fn max_recv_streams(&self) -> usize {
254
0
        self.max_recv_streams
255
0
    }
256
257
176k
    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258
176k
        assert!(stream.is_counted);
259
260
176k
        if self.peer.is_local_init(stream.id) {
261
176k
            assert!(self.num_send_streams > 0);
262
176k
            self.num_send_streams -= 1;
263
176k
            stream.is_counted = false;
264
        } else {
265
2
            assert!(self.num_recv_streams > 0);
266
2
            self.num_recv_streams -= 1;
267
2
            stream.is_counted = false;
268
        }
269
176k
    }
270
271
48.1k
    fn dec_num_reset_streams(&mut self) {
272
48.1k
        assert!(self.num_local_reset_streams > 0);
273
48.1k
        self.num_local_reset_streams -= 1;
274
48.1k
    }
275
}
276
277
impl Drop for Counts {
278
13.3k
    fn drop(&mut self) {
279
        use std::thread;
280
281
13.3k
        if !thread::panicking() {
282
13.3k
            debug_assert!(!self.has_streams());
283
0
        }
284
13.3k
    }
285
}