Coverage Report

Created: 2025-11-11 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/counts.rs
Line
Count
Source
1
use super::*;
2
3
#[derive(Debug)]
4
pub(super) struct Counts {
5
    /// Acting as a client or server. This allows us to track which values to
6
    /// inc / dec.
7
    peer: peer::Dyn,
8
9
    /// Maximum number of locally initiated streams
10
    max_send_streams: usize,
11
12
    /// Current number of remote initiated streams
13
    num_send_streams: usize,
14
15
    /// Maximum number of remote initiated streams
16
    max_recv_streams: usize,
17
18
    /// Current number of locally initiated streams
19
    num_recv_streams: usize,
20
21
    /// Maximum number of pending locally reset streams
22
    max_local_reset_streams: usize,
23
24
    /// Current number of pending locally reset streams
25
    num_local_reset_streams: usize,
26
27
    /// Max number of "pending accept" streams that were remotely reset
28
    max_remote_reset_streams: usize,
29
30
    /// Current number of "pending accept" streams that were remotely reset
31
    num_remote_reset_streams: usize,
32
33
    /// Maximum number of locally reset streams due to protocol error across
34
    /// the lifetime of the connection.
35
    ///
36
    /// When this gets exceeded, we issue GOAWAYs.
37
    max_local_error_reset_streams: Option<usize>,
38
39
    /// Total number of locally reset streams due to protocol error across the
40
    /// lifetime of the connection.
41
    num_local_error_reset_streams: usize,
42
}
43
44
impl Counts {
45
    /// Create a new `Counts` using the provided configuration values.
46
13.4k
    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47
13.4k
        Counts {
48
13.4k
            peer,
49
13.4k
            max_send_streams: config.initial_max_send_streams,
50
13.4k
            num_send_streams: 0,
51
13.4k
            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52
13.4k
            num_recv_streams: 0,
53
13.4k
            max_local_reset_streams: config.local_reset_max,
54
13.4k
            num_local_reset_streams: 0,
55
13.4k
            max_remote_reset_streams: config.remote_reset_max,
56
13.4k
            num_remote_reset_streams: 0,
57
13.4k
            max_local_error_reset_streams: config.local_max_error_reset_streams,
58
13.4k
            num_local_error_reset_streams: 0,
59
13.4k
        }
60
13.4k
    }
61
62
    /// Returns true when the next opened stream will reach capacity of outbound streams
63
    ///
64
    /// The number of client send streams is incremented in prioritize; send_request has to guess if
65
    /// it should wait before allowing another request to be sent.
66
424k
    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67
424k
        self.max_send_streams <= (self.num_send_streams + 1)
68
424k
    }
69
70
    /// Returns the current peer
71
984k
    pub fn peer(&self) -> peer::Dyn {
72
984k
        self.peer
73
984k
    }
74
75
1.49M
    pub fn has_streams(&self) -> bool {
76
1.49M
        self.num_send_streams != 0 || self.num_recv_streams != 0
77
1.49M
    }
78
79
    /// Returns true if we can issue another local reset due to protocol error.
80
150k
    pub fn can_inc_num_local_error_resets(&self) -> bool {
81
150k
        if let Some(max) = self.max_local_error_reset_streams {
82
150k
            max > self.num_local_error_reset_streams
83
        } else {
84
0
            true
85
        }
86
150k
    }
87
88
75.0k
    pub fn inc_num_local_error_resets(&mut self) {
89
75.0k
        assert!(self.can_inc_num_local_error_resets());
90
91
        // Increment the number of remote initiated streams
92
75.0k
        self.num_local_error_reset_streams += 1;
93
75.0k
    }
94
95
0
    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96
0
        self.max_local_error_reset_streams
97
0
    }
98
99
    /// Returns true if the receive stream concurrency can be incremented
100
1.13k
    pub fn can_inc_num_recv_streams(&self) -> bool {
101
1.13k
        self.max_recv_streams > self.num_recv_streams
102
1.13k
    }
103
104
    /// Increments the number of concurrent receive streams.
105
    ///
106
    /// # Panics
107
    ///
108
    /// Panics on failure as this should have been validated before hand.
109
3
    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110
3
        assert!(self.can_inc_num_recv_streams());
111
3
        assert!(!stream.is_counted);
112
113
        // Increment the number of remote initiated streams
114
3
        self.num_recv_streams += 1;
115
3
        stream.is_counted = true;
116
3
    }
117
118
    /// Returns true if the send stream concurrency can be incremented
119
523k
    pub fn can_inc_num_send_streams(&self) -> bool {
120
523k
        self.max_send_streams > self.num_send_streams
121
523k
    }
122
123
    /// Increments the number of concurrent send streams.
124
    ///
125
    /// # Panics
126
    ///
127
    /// Panics on failure as this should have been validated before hand.
128
181k
    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129
181k
        assert!(self.can_inc_num_send_streams());
130
181k
        assert!(!stream.is_counted);
131
132
        // Increment the number of remote initiated streams
133
181k
        self.num_send_streams += 1;
134
181k
        stream.is_counted = true;
135
181k
    }
136
137
    /// Returns true if the number of pending reset streams can be incremented.
138
182k
    pub fn can_inc_num_reset_streams(&self) -> bool {
139
182k
        self.max_local_reset_streams > self.num_local_reset_streams
140
182k
    }
141
142
    /// Increments the number of pending reset streams.
143
    ///
144
    /// # Panics
145
    ///
146
    /// Panics on failure as this should have been validated before hand.
147
50.5k
    pub fn inc_num_reset_streams(&mut self) {
148
50.5k
        assert!(self.can_inc_num_reset_streams());
149
150
50.5k
        self.num_local_reset_streams += 1;
151
50.5k
    }
152
153
0
    pub(crate) fn max_remote_reset_streams(&self) -> usize {
154
0
        self.max_remote_reset_streams
155
0
    }
156
157
    /// Returns true if the number of pending REMOTE reset streams can be
158
    /// incremented.
159
0
    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160
0
        self.max_remote_reset_streams > self.num_remote_reset_streams
161
0
    }
162
163
    /// Increments the number of pending REMOTE reset streams.
164
    ///
165
    /// # Panics
166
    ///
167
    /// Panics on failure as this should have been validated before hand.
168
0
    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169
0
        assert!(self.can_inc_num_remote_reset_streams());
170
171
0
        self.num_remote_reset_streams += 1;
172
0
    }
173
174
0
    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175
0
        assert!(self.num_remote_reset_streams > 0);
176
177
0
        self.num_remote_reset_streams -= 1;
178
0
    }
179
180
7.30k
    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181
7.30k
        match settings.max_concurrent_streams() {
182
709
            Some(val) => self.max_send_streams = val as usize,
183
909
            None if is_initial => self.max_send_streams = usize::MAX,
184
5.69k
            None => {}
185
        }
186
7.30k
    }
187
188
    /// Run a block of code that could potentially transition a stream's state.
189
    ///
190
    /// If the stream state transitions to closed, this function will perform
191
    /// all necessary cleanup.
192
    ///
193
    /// TODO: Is this function still needed?
194
2.16M
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.16M
    where
196
2.16M
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.16M
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.16M
        let ret = f(self, &mut stream);
203
204
2.16M
        self.transition_after(stream, is_pending_reset);
205
206
2.16M
        ret
207
2.16M
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Ptr>::{closure#0}, ()>
Line
Count
Source
194
31.6k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
31.6k
    where
196
31.6k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
31.6k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
31.6k
        let ret = f(self, &mut stream);
203
204
31.6k
        self.transition_after(stream, is_pending_reset);
205
206
31.6k
        ret
207
31.6k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Store>::{closure#0}, ()>
Line
Count
Source
194
71.4k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
71.4k
    where
196
71.4k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
71.4k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
71.4k
        let ret = f(self, &mut stream);
203
204
71.4k
        self.transition_after(stream, is_pending_reset);
205
206
71.4k
        ret
207
71.4k
    }
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
208
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
208
    where
196
208
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
208
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
208
        let ret = f(self, &mut stream);
203
204
208
        self.transition_after(stream, is_pending_reset);
205
206
208
        ret
207
208
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::clear_pending_capacity::{closure#0}, ()>
Line
Count
Source
194
12.8k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
12.8k
    where
196
12.8k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
12.8k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
12.8k
        let ret = f(self, &mut stream);
203
204
12.8k
        self.transition_after(stream, is_pending_reset);
205
206
12.8k
        ret
207
12.8k
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::clear_stream_window_update_queue::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}, ()>
Line
Count
Source
194
849k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
849k
    where
196
849k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
849k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
849k
        let ret = f(self, &mut stream);
203
204
849k
        self.transition_after(stream, is_pending_reset);
205
206
849k
        ret
207
849k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
589
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
589
    where
196
589
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
589
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
589
        let ret = f(self, &mut stream);
203
204
589
        self.transition_after(stream, is_pending_reset);
205
206
589
        ret
207
589
    }
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::send_stream_window_updates<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}, ()>
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
831
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
831
    where
196
831
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
831
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
831
        let ret = f(self, &mut stream);
203
204
831
        self.transition_after(stream, is_pending_reset);
205
206
831
        ret
207
831
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_headers<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
2.48k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
2.48k
    where
196
2.48k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
2.48k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
2.48k
        let ret = f(self, &mut stream);
203
204
2.48k
        self.transition_after(stream, is_pending_reset);
205
206
2.48k
        ret
207
2.48k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_push_promise<bytes::bytes::Bytes>::{closure#0}, core::result::Result<core::option::Option<h2::proto::streams::store::Key>, h2::proto::error::Error>>
Line
Count
Source
194
1.13k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
1.13k
    where
196
1.13k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
1.13k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
1.13k
        let ret = f(self, &mut stream);
203
204
1.13k
        self.transition_after(stream, is_pending_reset);
205
206
1.13k
        ret
207
1.13k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_data<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>>
Line
Count
Source
194
21.8k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
21.8k
    where
196
21.8k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
21.8k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
21.8k
        let ret = f(self, &mut stream);
203
204
21.8k
        self.transition_after(stream, is_pending_reset);
205
206
21.8k
        ret
207
21.8k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Actions>::send_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::GoAway>>
Line
Count
Source
194
74.0k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
74.0k
    where
196
74.0k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
74.0k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
74.0k
        let ret = f(self, &mut stream);
203
204
74.0k
        self.transition_after(stream, is_pending_reset);
205
206
74.0k
        ret
207
74.0k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
175k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
175k
    where
196
175k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
175k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
175k
        let ret = f(self, &mut stream);
203
204
175k
        self.transition_after(stream, is_pending_reset);
205
206
175k
        ret
207
175k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
236k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
236k
    where
196
236k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
236k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
236k
        let ret = f(self, &mut stream);
203
204
236k
        self.transition_after(stream, is_pending_reset);
205
206
236k
        ret
207
236k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()>
Line
Count
Source
194
261k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
261k
    where
196
261k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
261k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
261k
        let ret = f(self, &mut stream);
203
204
261k
        self.transition_after(stream, is_pending_reset);
205
206
261k
        ret
207
261k
    }
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::send_data::{closure#0}, core::result::Result<(), h2::codec::error::UserError>>
Line
Count
Source
194
423k
    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195
423k
    where
196
423k
        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197
    {
198
        // TODO: Does this need to be computed before performing the action?
199
423k
        let is_pending_reset = stream.is_pending_reset_expiration();
200
201
        // Run the action
202
423k
        let ret = f(self, &mut stream);
203
204
423k
        self.transition_after(stream, is_pending_reset);
205
206
423k
        ret
207
423k
    }
208
209
    // TODO: move this to macro?
210
2.86M
    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211
2.86M
        tracing::trace!(
212
0
            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213
0
             pending_send_empty={:?}; buffered_send_data={}; \
214
0
             num_recv={}; num_send={}",
215
0
            stream.id,
216
0
            stream.state,
217
0
            stream.is_closed(),
218
0
            stream.pending_send.is_empty(),
219
0
            stream.buffered_send_data,
220
            self.num_recv_streams,
221
            self.num_send_streams
222
        );
223
224
2.86M
        if stream.is_closed() {
225
1.54M
            if !stream.is_pending_reset_expiration() {
226
1.28M
                stream.unlink();
227
1.28M
                if is_reset_counted {
228
50.5k
                    self.dec_num_reset_streams();
229
1.23M
                }
230
268k
            }
231
232
1.54M
            if !stream.state.is_scheduled_reset() && stream.is_counted {
233
181k
                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234
                // Decrement the number of active streams.
235
181k
                self.dec_num_streams(&mut stream);
236
1.36M
            }
237
1.31M
        }
238
239
        // Release the stream if it requires releasing
240
2.86M
        if stream.is_released() {
241
475k
            stream.remove();
242
2.39M
        }
243
2.86M
    }
244
245
    /// Returns the maximum number of streams that can be initiated by this
246
    /// peer.
247
0
    pub(crate) fn max_send_streams(&self) -> usize {
248
0
        self.max_send_streams
249
0
    }
250
251
    /// Returns the maximum number of streams that can be initiated by the
252
    /// remote peer.
253
0
    pub(crate) fn max_recv_streams(&self) -> usize {
254
0
        self.max_recv_streams
255
0
    }
256
257
181k
    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258
181k
        assert!(stream.is_counted);
259
260
181k
        if self.peer.is_local_init(stream.id) {
261
181k
            assert!(self.num_send_streams > 0);
262
181k
            self.num_send_streams -= 1;
263
181k
            stream.is_counted = false;
264
        } else {
265
3
            assert!(self.num_recv_streams > 0);
266
3
            self.num_recv_streams -= 1;
267
3
            stream.is_counted = false;
268
        }
269
181k
    }
270
271
50.5k
    fn dec_num_reset_streams(&mut self) {
272
50.5k
        assert!(self.num_local_reset_streams > 0);
273
50.5k
        self.num_local_reset_streams -= 1;
274
50.5k
    }
275
}
276
277
impl Drop for Counts {
278
13.4k
    fn drop(&mut self) {
279
        use std::thread;
280
281
13.4k
        if !thread::panicking() {
282
13.4k
            debug_assert!(!self.has_streams());
283
0
        }
284
13.4k
    }
285
}