Coverage Report

Created: 2025-10-13 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/counts.rs
Line
Count
Source
1
use super::*;
2
3
#[derive(Debug)]
4
pub(super) struct Counts {
5
    /// Acting as a client or server. This allows us to track which values to
6
    /// inc / dec.
7
    peer: peer::Dyn,
8
9
    /// Maximum number of locally initiated streams
10
    max_send_streams: usize,
11
12
    /// Current number of remote initiated streams
13
    num_send_streams: usize,
14
15
    /// Maximum number of remote initiated streams
16
    max_recv_streams: usize,
17
18
    /// Current number of locally initiated streams
19
    num_recv_streams: usize,
20
21
    /// Maximum number of pending locally reset streams
22
    max_local_reset_streams: usize,
23
24
    /// Current number of pending locally reset streams
25
    num_local_reset_streams: usize,
26
27
    /// Max number of "pending accept" streams that were remotely reset
28
    max_remote_reset_streams: usize,
29
30
    /// Current number of "pending accept" streams that were remotely reset
31
    num_remote_reset_streams: usize,
32
33
    /// Maximum number of locally reset streams due to protocol error across
34
    /// the lifetime of the connection.
35
    ///
36
    /// When this gets exceeded, we issue GOAWAYs.
37
    max_local_error_reset_streams: Option<usize>,
38
39
    /// Total number of locally reset streams due to protocol error across the
40
    /// lifetime of the connection.
41
    num_local_error_reset_streams: usize,
42
}
43
44
impl Counts {
45
    /// Create a new `Counts` using the provided configuration values.
46
13.2k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47
13.2k
        Counts {
48
13.2k
            peer,
49
13.2k
            max_send_streams: config.initial_max_send_streams,
50
13.2k
            num_send_streams: 0,
51
13.2k
            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52
13.2k
            num_recv_streams: 0,
53
13.2k
            max_local_reset_streams: config.local_reset_max,
54
13.2k
            num_local_reset_streams: 0,
55
13.2k
            max_remote_reset_streams: config.remote_reset_max,
56
13.2k
            num_remote_reset_streams: 0,
57
13.2k
            max_local_error_reset_streams: config.local_max_error_reset_streams,
58
13.2k
            num_local_error_reset_streams: 0,
59
13.2k
        }
60
13.2k
    }
61
62
    /// Returns true when the next opened stream will reach capacity of outbound streams
63
    ///
64
    /// The number of client send streams is incremented in prioritize; send_request has to guess if
65
    /// it should wait before allowing another request to be sent.
66
427k
    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67
427k
        self.max_send_streams <= (self.num_send_streams + 1)
68
427k
    }
69
70
    /// Returns the current peer
71
975k
    pub fn peer(&self) -> peer::Dyn {
72
975k
        self.peer
73
975k
    }
74
75
2.42M
    pub fn has_streams(&self) -> bool {
76
2.42M
        self.num_send_streams != 0 || self.num_recv_streams != 0
77
2.42M
    }
78
79
    /// Returns true if we can issue another local reset due to protocol error.
80
117k
    pub fn can_inc_num_local_error_resets(&self) -> bool {
81
117k
        if let Some(max) = self.max_local_error_reset_streams {
82
117k
            max > self.num_local_error_reset_streams
83
        } else {
84
0
            true
85
        }
86
117k
    }
87
88
58.5k
    pub fn inc_num_local_error_resets(&mut self) {
89
58.5k
        assert!(self.can_inc_num_local_error_resets());
90
91
        // Increment the number of remote initiated streams
92
58.5k
        self.num_local_error_reset_streams += 1;
93
58.5k
    }
94
95
0
    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96
0
        self.max_local_error_reset_streams
97
0
    }
98
99
    /// Returns true if the receive stream concurrency can be incremented
100
1.12k
    pub fn can_inc_num_recv_streams(&self) -> bool {
101
1.12k
        self.max_recv_streams > self.num_recv_streams
102
1.12k
    }
103
104
    /// Increments the number of concurrent receive streams.
105
    ///
106
    /// # Panics
107
    ///
108
    /// Panics on failure as this should have been validated before hand.
109
3
    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110
3
        assert!(self.can_inc_num_recv_streams());
111
3
        assert!(!stream.is_counted);
112
113
        // Increment the number of remote initiated streams
114
3
        self.num_recv_streams += 1;
115
3
        stream.is_counted = true;
116
3
    }
117
118
    /// Returns true if the send stream concurrency can be incremented
119
391k
    pub fn can_inc_num_send_streams(&self) -> bool {
120
391k
        self.max_send_streams > self.num_send_streams
121
391k
    }
122
123
    /// Increments the number of concurrent send streams.
124
    ///
125
    /// # Panics
126
    ///
127
    /// Panics on failure as this should have been validated before hand.
128
179k
    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129
179k
        assert!(self.can_inc_num_send_streams());
130
179k
        assert!(!stream.is_counted);
131
132
        // Increment the number of remote initiated streams
133
179k
        self.num_send_streams += 1;
134
179k
        stream.is_counted = true;
135
179k
    }
136
137
    /// Returns true if the number of pending reset streams can be incremented.
138
165k
    pub fn can_inc_num_reset_streams(&self) -> bool {
139
165k
        self.max_local_reset_streams > self.num_local_reset_streams
140
165k
    }
141
142
    /// Increments the number of pending reset streams.
143
    ///
144
    /// # Panics
145
    ///
146
    /// Panics on failure as this should have been validated before hand.
147
49.4k
    pub fn inc_num_reset_streams(&mut self) {
148
49.4k
        assert!(self.can_inc_num_reset_streams());
149
150
49.4k
        self.num_local_reset_streams += 1;
151
49.4k
    }
152
153
0
    pub(crate) fn max_remote_reset_streams(&self) -> usize {
154
0
        self.max_remote_reset_streams
155
0
    }
156
157
    /// Returns true if the number of pending REMOTE reset streams can be
158
    /// incremented.
159
0
    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160
0
        self.max_remote_reset_streams > self.num_remote_reset_streams
161
0
    }
162
163
    /// Increments the number of pending REMOTE reset streams.
164
    ///
165
    /// # Panics
166
    ///
167
    /// Panics on failure as this should have been validated before hand.
168
0
    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169
0
        assert!(self.can_inc_num_remote_reset_streams());
170
171
0
        self.num_remote_reset_streams += 1;
172
0
    }
173
174
0
    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175
0
        assert!(self.num_remote_reset_streams > 0);
176
177
0
        self.num_remote_reset_streams -= 1;
178
0
    }
179
180
7.29k
    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181
7.29k
        match settings.max_concurrent_streams() {
182
585
            Some(val) => self.max_send_streams = val as usize,
183
895
            None if is_initial => self.max_send_streams = usize::MAX,
184
5.81k
            None => {}
185
        }
186
7.29k
    }
187
188
    /// Run a block of code that could potentially transition a stream's state.
189
    ///
190
    /// If the stream state transitions to closed, this function will perform
191
    /// all necessary cleanup.
192
    ///
193
    /// TODO: Is this function still needed?
194
2.06M
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.06M
    where
196
2.06M
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.06M
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.06M
        let ret = f(self, &mut stream);
203
204
2.06M
        self.transition_after(stream, is_pending_reset);
205
206
2.06M
        ret
207
2.06M
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Ptr>::{closure#0}, ()>
Line
Count
Source
194
31.5k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
31.5k
    where
196
31.5k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
31.5k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
31.5k
        let ret = f(self, &mut stream);
203
204
31.5k
        self.transition_after(stream, is_pending_reset);
205
206
31.5k
        ret
207
31.5k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Store>::{closure#0}, ()>
Line
Count
Source
194
70.2k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
70.2k
    where
196
70.2k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
70.2k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
70.2k
        let ret = f(self, &mut stream);
203
204
70.2k
        self.transition_after(stream, is_pending_reset);
205
206
70.2k
        ret
207
70.2k
    }
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
202
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
202
    where
196
202
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
202
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
202
        let ret = f(self, &mut stream);
203
204
202
        self.transition_after(stream, is_pending_reset);
205
206
202
        ret
207
202
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::clear_pending_capacity::{closure#0}, ()>
Line
Count
Source
194
12.3k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
12.3k
    where
196
12.3k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
12.3k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
12.3k
        let ret = f(self, &mut stream);
203
204
12.3k
        self.transition_after(stream, is_pending_reset);
205
206
12.3k
        ret
207
12.3k
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::clear_stream_window_update_queue::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}, ()>
Line
Count
Source
194
855k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
855k
    where
196
855k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
855k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
855k
        let ret = f(self, &mut stream);
203
204
855k
        self.transition_after(stream, is_pending_reset);
205
206
855k
        ret
207
855k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
568
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
568
    where
196
568
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
568
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
568
        let ret = f(self, &mut stream);
203
204
568
        self.transition_after(stream, is_pending_reset);
205
206
568
        ret
207
568
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::send_stream_window_updates<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
635
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
635
    where
196
635
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
635
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
635
        let ret = f(self, &mut stream);
203
204
635
        self.transition_after(stream, is_pending_reset);
205
206
635
        ret
207
635
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_headers<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
2.53k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.53k
    where
196
2.53k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.53k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.53k
        let ret = f(self, &mut stream);
203
204
2.53k
        self.transition_after(stream, is_pending_reset);
205
206
2.53k
        ret
207
2.53k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_push_promise<bytes::bytes::Bytes>::{closure#0}, core::result::Result<core::option::Option<h2::proto::streams::store::Key>, h2::proto::error::Error>>
Line
Count
Source
194
1.12k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
1.12k
    where
196
1.12k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
1.12k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
1.12k
        let ret = f(self, &mut stream);
203
204
1.12k
        self.transition_after(stream, is_pending_reset);
205
206
1.12k
        ret
207
1.12k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_data<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
17.5k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
17.5k
    where
196
17.5k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
17.5k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
17.5k
        let ret = f(self, &mut stream);
203
204
17.5k
        self.transition_after(stream, is_pending_reset);
205
206
17.5k
        ret
207
17.5k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Actions>::send_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::GoAway>>
Line
Count
Source
194
57.5k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
57.5k
    where
196
57.5k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
57.5k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
57.5k
        let ret = f(self, &mut stream);
203
204
57.5k
        self.transition_after(stream, is_pending_reset);
205
206
57.5k
        ret
207
57.5k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
172k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
172k
    where
196
172k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
172k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
172k
        let ret = f(self, &mut stream);
203
204
172k
        self.transition_after(stream, is_pending_reset);
205
206
172k
        ret
207
172k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
152k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
152k
    where
196
152k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
152k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
152k
        let ret = f(self, &mut stream);
203
204
152k
        self.transition_after(stream, is_pending_reset);
205
206
152k
        ret
207
152k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
263k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
263k
    where
196
263k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
263k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
263k
        let ret = f(self, &mut stream);
203
204
263k
        self.transition_after(stream, is_pending_reset);
205
206
263k
        ret
207
263k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::send_data::{closure#0}, core::result::Result<(), h2::codec::error::UserError>>
Line
Count
Source
194
426k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
426k
    where
196
426k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
426k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
426k
        let ret = f(self, &mut stream);
203
204
426k
        self.transition_after(stream, is_pending_reset);
205
206
426k
        ret
207
426k
    }
208
209
    // TODO: move this to macro?
210
2.75M
    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211
2.75M
        tracing::trace!(
212
0
            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213
0
             pending_send_empty={:?}; buffered_send_data={}; \
214
0
             num_recv={}; num_send={}",
215
0
            stream.id,
216
0
            stream.state,
217
0
            stream.is_closed(),
218
0
            stream.pending_send.is_empty(),
219
0
            stream.buffered_send_data,
220
            self.num_recv_streams,
221
            self.num_send_streams
222
        );
223
224
2.75M
        if stream.is_closed() {
225
1.45M
            if !stream.is_pending_reset_expiration() {
226
1.25M
                stream.unlink();
227
1.25M
                if is_reset_counted {
228
49.4k
                    self.dec_num_reset_streams();
229
1.21M
                }
230
192k
            }
231
232
1.45M
            if !stream.state.is_scheduled_reset() && stream.is_counted {
233
179k
                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234
                // Decrement the number of active streams.
235
179k
                self.dec_num_streams(&mut stream);
236
1.27M
            }
237
1.30M
        }
238
239
        // Release the stream if it requires releasing
240
2.75M
        if stream.is_released() {
241
462k
            stream.remove();
242
2.29M
        }
243
2.75M
    }
244
245
    /// Returns the maximum number of streams that can be initiated by this
246
    /// peer.
247
0
    pub(crate) fn max_send_streams(&self) -> usize {
248
0
        self.max_send_streams
249
0
    }
250
251
    /// Returns the maximum number of streams that can be initiated by the
252
    /// remote peer.
253
0
    pub(crate) fn max_recv_streams(&self) -> usize {
254
0
        self.max_recv_streams
255
0
    }
256
257
179k
    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258
179k
        assert!(stream.is_counted);
259
260
179k
        if self.peer.is_local_init(stream.id) {
261
179k
            assert!(self.num_send_streams > 0);
262
179k
            self.num_send_streams -= 1;
263
179k
            stream.is_counted = false;
264
        } else {
265
3
            assert!(self.num_recv_streams > 0);
266
3
            self.num_recv_streams -= 1;
267
3
            stream.is_counted = false;
268
        }
269
179k
    }
270
271
49.4k
    fn dec_num_reset_streams(&mut self) {
272
49.4k
        assert!(self.num_local_reset_streams > 0);
273
49.4k
        self.num_local_reset_streams -= 1;
274
49.4k
    }
275
}
276
277
impl Drop for Counts {
278
13.2k
    fn drop(&mut self) {
279
        use std::thread;
280
281
13.2k
        if !thread::panicking() {
282
13.2k
            debug_assert!(!self.has_streams());
283
0
        }
284
13.2k
    }
285
}