Coverage Report

Created: 2026-05-30 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/counts.rs
Line
Count
Source
1
use super::*;
2
3
#[derive(Debug)]
4
pub(super) struct Counts {
5
    /// Acting as a client or server. This allows us to track which values to
6
    /// inc / dec.
7
    peer: peer::Dyn,
8
9
    /// Maximum number of locally initiated streams
10
    max_send_streams: usize,
11
12
    /// Current number of remote initiated streams
13
    num_send_streams: usize,
14
15
    /// Maximum number of remote initiated streams
16
    max_recv_streams: usize,
17
18
    /// Current number of locally initiated streams
19
    num_recv_streams: usize,
20
21
    /// Maximum number of pending locally reset streams
22
    max_local_reset_streams: usize,
23
24
    /// Current number of pending locally reset streams
25
    num_local_reset_streams: usize,
26
27
    /// Max number of "pending accept" streams that were remotely reset
28
    max_remote_reset_streams: usize,
29
30
    /// Current number of "pending accept" streams that were remotely reset
31
    num_remote_reset_streams: usize,
32
33
    /// Maximum number of locally reset streams due to protocol error across
34
    /// the lifetime of the connection.
35
    ///
36
    /// When this gets exceeded, we issue GOAWAYs.
37
    max_local_error_reset_streams: Option<usize>,
38
39
    /// Total number of locally reset streams due to protocol error across the
40
    /// lifetime of the connection.
41
    num_local_error_reset_streams: usize,
42
}
43
44
impl Counts {
45
    /// Create a new `Counts` using the provided configuration values.
46
14.5k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47
14.5k
        Counts {
48
14.5k
            peer,
49
14.5k
            max_send_streams: config.initial_max_send_streams,
50
14.5k
            num_send_streams: 0,
51
14.5k
            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52
14.5k
            num_recv_streams: 0,
53
14.5k
            max_local_reset_streams: config.local_reset_max,
54
14.5k
            num_local_reset_streams: 0,
55
14.5k
            max_remote_reset_streams: config.remote_reset_max,
56
14.5k
            num_remote_reset_streams: 0,
57
14.5k
            max_local_error_reset_streams: config.local_max_error_reset_streams,
58
14.5k
            num_local_error_reset_streams: 0,
59
14.5k
        }
60
14.5k
    }
61
62
    /// Returns true when the next opened stream will reach capacity of outbound streams
63
    ///
64
    /// The number of client send streams is incremented in prioritize; send_request has to guess if
65
    /// it should wait before allowing another request to be sent.
66
487k
    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67
487k
        self.max_send_streams <= (self.num_send_streams + 1)
68
487k
    }
69
70
    /// Returns the current peer
71
1.08M
    pub fn peer(&self) -> peer::Dyn {
72
1.08M
        self.peer
73
1.08M
    }
74
75
1.38M
    pub fn has_streams(&self) -> bool {
76
1.38M
        self.num_send_streams != 0 || self.num_recv_streams != 0
77
1.38M
    }
78
79
    /// Returns true if we can issue another local reset due to protocol error.
80
103k
    pub fn can_inc_num_local_error_resets(&self) -> bool {
81
103k
        if let Some(max) = self.max_local_error_reset_streams {
82
103k
            max > self.num_local_error_reset_streams
83
        } else {
84
0
            true
85
        }
86
103k
    }
87
88
51.7k
    pub fn inc_num_local_error_resets(&mut self) {
89
51.7k
        assert!(self.can_inc_num_local_error_resets());
90
91
        // Increment the number of remote initiated streams
92
51.7k
        self.num_local_error_reset_streams += 1;
93
51.7k
    }
94
95
0
    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96
0
        self.max_local_error_reset_streams
97
0
    }
98
99
    /// Returns true if the receive stream concurrency can be incremented
100
1.10k
    pub fn can_inc_num_recv_streams(&self) -> bool {
101
1.10k
        self.max_recv_streams > self.num_recv_streams
102
1.10k
    }
103
104
    /// Increments the number of concurrent receive streams.
105
    ///
106
    /// # Panics
107
    ///
108
    /// Panics on failure as this should have been validated before hand.
109
2
    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110
2
        assert!(self.can_inc_num_recv_streams());
111
2
        assert!(!stream.is_counted);
112
113
        // Increment the number of remote initiated streams
114
2
        self.num_recv_streams += 1;
115
2
        stream.is_counted = true;
116
2
    }
117
118
    /// Returns true if the send stream concurrency can be incremented
119
490k
    pub fn can_inc_num_send_streams(&self) -> bool {
120
490k
        self.max_send_streams > self.num_send_streams
121
490k
    }
122
123
    /// Increments the number of concurrent send streams.
124
    ///
125
    /// # Panics
126
    ///
127
    /// Panics on failure as this should have been validated before hand.
128
160k
    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129
160k
        assert!(self.can_inc_num_send_streams());
130
160k
        assert!(!stream.is_counted);
131
132
        // Increment the number of remote initiated streams
133
160k
        self.num_send_streams += 1;
134
160k
        stream.is_counted = true;
135
160k
    }
136
137
    /// Returns true if the number of pending reset streams can be incremented.
138
151k
    pub fn can_inc_num_reset_streams(&self) -> bool {
139
151k
        self.max_local_reset_streams > self.num_local_reset_streams
140
151k
    }
141
142
    /// Increments the number of pending reset streams.
143
    ///
144
    /// # Panics
145
    ///
146
    /// Panics on failure as this should have been validated before hand.
147
49.5k
    pub fn inc_num_reset_streams(&mut self) {
148
49.5k
        assert!(self.can_inc_num_reset_streams());
149
150
49.5k
        self.num_local_reset_streams += 1;
151
49.5k
    }
152
153
0
    pub(crate) fn max_remote_reset_streams(&self) -> usize {
154
0
        self.max_remote_reset_streams
155
0
    }
156
157
    /// Returns true if the number of pending REMOTE reset streams can be
158
    /// incremented.
159
0
    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160
0
        self.max_remote_reset_streams > self.num_remote_reset_streams
161
0
    }
162
163
    /// Increments the number of pending REMOTE reset streams.
164
    ///
165
    /// # Panics
166
    ///
167
    /// Panics on failure as this should have been validated before hand.
168
0
    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169
0
        assert!(self.can_inc_num_remote_reset_streams());
170
171
0
        self.num_remote_reset_streams += 1;
172
0
    }
173
174
0
    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175
0
        assert!(self.num_remote_reset_streams > 0);
176
177
0
        self.num_remote_reset_streams -= 1;
178
0
    }
179
180
6.39k
    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181
6.39k
        match settings.max_concurrent_streams() {
182
580
            Some(val) => self.max_send_streams = val as usize,
183
816
            None if is_initial => self.max_send_streams = usize::MAX,
184
5.00k
            None => {}
185
        }
186
6.39k
    }
187
188
    /// Run a block of code that could potentially transition a stream's state.
189
    ///
190
    /// If the stream state transitions to closed, this function will perform
191
    /// all necessary cleanup.
192
    ///
193
    /// TODO: Is this function still needed?
194
2.26M
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.26M
    where
196
2.26M
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.26M
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.26M
        let ret = f(self, &mut stream);
203
204
2.26M
        self.transition_after(stream, is_pending_reset);
205
206
2.26M
        ret
207
2.26M
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Ptr>::{closure#0}, ()>
Line
Count
Source
194
27.0k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
27.0k
    where
196
27.0k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
27.0k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
27.0k
        let ret = f(self, &mut stream);
203
204
27.0k
        self.transition_after(stream, is_pending_reset);
205
206
27.0k
        ret
207
27.0k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Store>::{closure#0}, ()>
Line
Count
Source
194
32.0k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
32.0k
    where
196
32.0k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
32.0k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
32.0k
        let ret = f(self, &mut stream);
203
204
32.0k
        self.transition_after(stream, is_pending_reset);
205
206
32.0k
        ret
207
32.0k
    }
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
163
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
163
    where
196
163
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
163
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
163
        let ret = f(self, &mut stream);
203
204
163
        self.transition_after(stream, is_pending_reset);
205
206
163
        ret
207
163
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::clear_pending_capacity::{closure#0}, ()>
Line
Count
Source
194
12.0k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
12.0k
    where
196
12.0k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
12.0k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
12.0k
        let ret = f(self, &mut stream);
203
204
12.0k
        self.transition_after(stream, is_pending_reset);
205
206
12.0k
        ret
207
12.0k
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::clear_stream_window_update_queue::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}, ()>
Line
Count
Source
194
975k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
975k
    where
196
975k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
975k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
975k
        let ret = f(self, &mut stream);
203
204
975k
        self.transition_after(stream, is_pending_reset);
205
206
975k
        ret
207
975k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
708
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
708
    where
196
708
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
708
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
708
        let ret = f(self, &mut stream);
203
204
708
        self.transition_after(stream, is_pending_reset);
205
206
708
        ret
207
708
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::send_stream_window_updates<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
66
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
66
    where
196
66
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
66
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
66
        let ret = f(self, &mut stream);
203
204
66
        self.transition_after(stream, is_pending_reset);
205
206
66
        ret
207
66
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_headers<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
1.01k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
1.01k
    where
196
1.01k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
1.01k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
1.01k
        let ret = f(self, &mut stream);
203
204
1.01k
        self.transition_after(stream, is_pending_reset);
205
206
1.01k
        ret
207
1.01k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_push_promise<bytes::bytes::Bytes>::{closure#0}, core::result::Result<core::option::Option<h2::proto::streams::store::Key>, h2::proto::error::Error>>
Line
Count
Source
194
1.09k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
1.09k
    where
196
1.09k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
1.09k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
1.09k
        let ret = f(self, &mut stream);
203
204
1.09k
        self.transition_after(stream, is_pending_reset);
205
206
1.09k
        ret
207
1.09k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_data<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
41.2k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
41.2k
    where
196
41.2k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
41.2k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
41.2k
        let ret = f(self, &mut stream);
203
204
41.2k
        self.transition_after(stream, is_pending_reset);
205
206
41.2k
        ret
207
41.2k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Actions>::send_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::GoAway>>
Line
Count
Source
194
50.7k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
50.7k
    where
196
50.7k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
50.7k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
50.7k
        let ret = f(self, &mut stream);
203
204
50.7k
        self.transition_after(stream, is_pending_reset);
205
206
50.7k
        ret
207
50.7k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
276k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
276k
    where
196
276k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
276k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
276k
        let ret = f(self, &mut stream);
203
204
276k
        self.transition_after(stream, is_pending_reset);
205
206
276k
        ret
207
276k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
137k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
137k
    where
196
137k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
137k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
137k
        let ret = f(self, &mut stream);
203
204
137k
        self.transition_after(stream, is_pending_reset);
205
206
137k
        ret
207
137k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
222k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
222k
    where
196
222k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
222k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
222k
        let ret = f(self, &mut stream);
203
204
222k
        self.transition_after(stream, is_pending_reset);
205
206
222k
        ret
207
222k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::send_data::{closure#0}, core::result::Result<(), h2::codec::error::UserError>>
Line
Count
Source
194
486k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
486k
    where
196
486k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
486k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
486k
        let ret = f(self, &mut stream);
203
204
486k
        self.transition_after(stream, is_pending_reset);
205
206
486k
        ret
207
486k
    }
208
209
    // TODO: move this to macro?
210
2.98M
    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211
2.98M
        tracing::trace!(
212
0
            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213
0
             pending_send_empty={:?}; buffered_send_data={}; \
214
0
             num_recv={}; num_send={}",
215
0
            stream.id,
216
0
            stream.state,
217
0
            stream.is_closed(),
218
0
            stream.pending_send.is_empty(),
219
0
            stream.buffered_send_data,
220
            self.num_recv_streams,
221
            self.num_send_streams
222
        );
223
224
2.98M
        if stream.is_closed() {
225
1.61M
            if !stream.is_pending_reset_expiration() {
226
1.43M
                stream.unlink();
227
1.43M
                if is_reset_counted {
228
49.5k
                    self.dec_num_reset_streams();
229
1.38M
                }
230
179k
            }
231
232
1.61M
            if !stream.state.is_scheduled_reset() && stream.is_counted {
233
160k
                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234
                // Decrement the number of active streams.
235
160k
                self.dec_num_streams(&mut stream);
236
1.45M
            }
237
1.37M
        }
238
239
        // Release the stream if it requires releasing
240
2.98M
        if stream.is_released() {
241
514k
            stream.remove();
242
2.47M
        }
243
2.98M
    }
244
245
    /// Returns the maximum number of streams that can be initiated by this
246
    /// peer.
247
0
    pub(crate) fn max_send_streams(&self) -> usize {
248
0
        self.max_send_streams
249
0
    }
250
251
    /// Returns the maximum number of streams that can be initiated by the
252
    /// remote peer.
253
0
    pub(crate) fn max_recv_streams(&self) -> usize {
254
0
        self.max_recv_streams
255
0
    }
256
257
160k
    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258
160k
        assert!(stream.is_counted);
259
260
160k
        if self.peer.is_local_init(stream.id) {
261
160k
            assert!(self.num_send_streams > 0);
262
160k
            self.num_send_streams -= 1;
263
160k
            stream.is_counted = false;
264
        } else {
265
2
            assert!(self.num_recv_streams > 0);
266
2
            self.num_recv_streams -= 1;
267
2
            stream.is_counted = false;
268
        }
269
160k
    }
270
271
49.5k
    fn dec_num_reset_streams(&mut self) {
272
49.5k
        assert!(self.num_local_reset_streams > 0);
273
49.5k
        self.num_local_reset_streams -= 1;
274
49.5k
    }
275
}
276
277
impl Drop for Counts {
278
14.5k
    fn drop(&mut self) {
279
        use std::thread;
280
281
14.5k
        if !thread::panicking() {
282
14.5k
            debug_assert!(!self.has_streams());
283
0
        }
284
14.5k
    }
285
}