/src/h2/src/proto/streams/counts.rs
Line | Count | Source |
1 | | use super::*; |
2 | | |
3 | | #[derive(Debug)] |
4 | | pub(super) struct Counts { |
5 | | /// Acting as a client or server. This allows us to track which values to |
6 | | /// inc / dec. |
7 | | peer: peer::Dyn, |
8 | | |
9 | | /// Maximum number of locally initiated streams |
10 | | max_send_streams: usize, |
11 | | |
12 | | /// Current number of remote initiated streams |
13 | | num_send_streams: usize, |
14 | | |
15 | | /// Maximum number of remote initiated streams |
16 | | max_recv_streams: usize, |
17 | | |
18 | | /// Current number of locally initiated streams |
19 | | num_recv_streams: usize, |
20 | | |
21 | | /// Maximum number of pending locally reset streams |
22 | | max_local_reset_streams: usize, |
23 | | |
24 | | /// Current number of pending locally reset streams |
25 | | num_local_reset_streams: usize, |
26 | | |
27 | | /// Max number of "pending accept" streams that were remotely reset |
28 | | max_remote_reset_streams: usize, |
29 | | |
30 | | /// Current number of "pending accept" streams that were remotely reset |
31 | | num_remote_reset_streams: usize, |
32 | | |
33 | | /// Maximum number of locally reset streams due to protocol error across |
34 | | /// the lifetime of the connection. |
35 | | /// |
36 | | /// When this gets exceeded, we issue GOAWAYs. |
37 | | max_local_error_reset_streams: Option<usize>, |
38 | | |
39 | | /// Total number of locally reset streams due to protocol error across the |
40 | | /// lifetime of the connection. |
41 | | num_local_error_reset_streams: usize, |
42 | | } |
43 | | |
44 | | impl Counts { |
45 | | /// Create a new `Counts` using the provided configuration values. |
46 | 13.2k | pub fn new(peer: peer::Dyn, config: &Config) -> Self { |
47 | 13.2k | Counts { |
48 | 13.2k | peer, |
49 | 13.2k | max_send_streams: config.initial_max_send_streams, |
50 | 13.2k | num_send_streams: 0, |
51 | 13.2k | max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), |
52 | 13.2k | num_recv_streams: 0, |
53 | 13.2k | max_local_reset_streams: config.local_reset_max, |
54 | 13.2k | num_local_reset_streams: 0, |
55 | 13.2k | max_remote_reset_streams: config.remote_reset_max, |
56 | 13.2k | num_remote_reset_streams: 0, |
57 | 13.2k | max_local_error_reset_streams: config.local_max_error_reset_streams, |
58 | 13.2k | num_local_error_reset_streams: 0, |
59 | 13.2k | } |
60 | 13.2k | } |
61 | | |
62 | | /// Returns true when the next opened stream will reach capacity of outbound streams |
63 | | /// |
64 | | /// The number of client send streams is incremented in prioritize; send_request has to guess if |
65 | | /// it should wait before allowing another request to be sent. |
66 | 427k | pub fn next_send_stream_will_reach_capacity(&self) -> bool { |
67 | 427k | self.max_send_streams <= (self.num_send_streams + 1) |
68 | 427k | } |
69 | | |
70 | | /// Returns the current peer |
71 | 975k | pub fn peer(&self) -> peer::Dyn { |
72 | 975k | self.peer |
73 | 975k | } |
74 | | |
75 | 2.42M | pub fn has_streams(&self) -> bool { |
76 | 2.42M | self.num_send_streams != 0 || self.num_recv_streams != 0 |
77 | 2.42M | } |
78 | | |
79 | | /// Returns true if we can issue another local reset due to protocol error. |
80 | 117k | pub fn can_inc_num_local_error_resets(&self) -> bool { |
81 | 117k | if let Some(max) = self.max_local_error_reset_streams { |
82 | 117k | max > self.num_local_error_reset_streams |
83 | | } else { |
84 | 0 | true |
85 | | } |
86 | 117k | } |
87 | | |
88 | 58.5k | pub fn inc_num_local_error_resets(&mut self) { |
89 | 58.5k | assert!(self.can_inc_num_local_error_resets()); |
90 | | |
91 | | // Increment the number of remote initiated streams |
92 | 58.5k | self.num_local_error_reset_streams += 1; |
93 | 58.5k | } |
94 | | |
95 | 0 | pub(crate) fn max_local_error_resets(&self) -> Option<usize> { |
96 | 0 | self.max_local_error_reset_streams |
97 | 0 | } |
98 | | |
99 | | /// Returns true if the receive stream concurrency can be incremented |
100 | 1.12k | pub fn can_inc_num_recv_streams(&self) -> bool { |
101 | 1.12k | self.max_recv_streams > self.num_recv_streams |
102 | 1.12k | } |
103 | | |
104 | | /// Increments the number of concurrent receive streams. |
105 | | /// |
106 | | /// # Panics |
107 | | /// |
108 | | /// Panics on failure as this should have been validated before hand. |
109 | 3 | pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { |
110 | 3 | assert!(self.can_inc_num_recv_streams()); |
111 | 3 | assert!(!stream.is_counted); |
112 | | |
113 | | // Increment the number of remote initiated streams |
114 | 3 | self.num_recv_streams += 1; |
115 | 3 | stream.is_counted = true; |
116 | 3 | } |
117 | | |
118 | | /// Returns true if the send stream concurrency can be incremented |
119 | 391k | pub fn can_inc_num_send_streams(&self) -> bool { |
120 | 391k | self.max_send_streams > self.num_send_streams |
121 | 391k | } |
122 | | |
123 | | /// Increments the number of concurrent send streams. |
124 | | /// |
125 | | /// # Panics |
126 | | /// |
127 | | /// Panics on failure as this should have been validated before hand. |
128 | 179k | pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { |
129 | 179k | assert!(self.can_inc_num_send_streams()); |
130 | 179k | assert!(!stream.is_counted); |
131 | | |
132 | | // Increment the number of remote initiated streams |
133 | 179k | self.num_send_streams += 1; |
134 | 179k | stream.is_counted = true; |
135 | 179k | } |
136 | | |
137 | | /// Returns true if the number of pending reset streams can be incremented. |
138 | 165k | pub fn can_inc_num_reset_streams(&self) -> bool { |
139 | 165k | self.max_local_reset_streams > self.num_local_reset_streams |
140 | 165k | } |
141 | | |
142 | | /// Increments the number of pending reset streams. |
143 | | /// |
144 | | /// # Panics |
145 | | /// |
146 | | /// Panics on failure as this should have been validated before hand. |
147 | 49.4k | pub fn inc_num_reset_streams(&mut self) { |
148 | 49.4k | assert!(self.can_inc_num_reset_streams()); |
149 | | |
150 | 49.4k | self.num_local_reset_streams += 1; |
151 | 49.4k | } |
152 | | |
153 | 0 | pub(crate) fn max_remote_reset_streams(&self) -> usize { |
154 | 0 | self.max_remote_reset_streams |
155 | 0 | } |
156 | | |
157 | | /// Returns true if the number of pending REMOTE reset streams can be |
158 | | /// incremented. |
159 | 0 | pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { |
160 | 0 | self.max_remote_reset_streams > self.num_remote_reset_streams |
161 | 0 | } |
162 | | |
163 | | /// Increments the number of pending REMOTE reset streams. |
164 | | /// |
165 | | /// # Panics |
166 | | /// |
167 | | /// Panics on failure as this should have been validated before hand. |
168 | 0 | pub(crate) fn inc_num_remote_reset_streams(&mut self) { |
169 | 0 | assert!(self.can_inc_num_remote_reset_streams()); |
170 | | |
171 | 0 | self.num_remote_reset_streams += 1; |
172 | 0 | } |
173 | | |
174 | 0 | pub(crate) fn dec_num_remote_reset_streams(&mut self) { |
175 | 0 | assert!(self.num_remote_reset_streams > 0); |
176 | | |
177 | 0 | self.num_remote_reset_streams -= 1; |
178 | 0 | } |
179 | | |
180 | 7.29k | pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { |
181 | 7.29k | match settings.max_concurrent_streams() { |
182 | 585 | Some(val) => self.max_send_streams = val as usize, |
183 | 895 | None if is_initial => self.max_send_streams = usize::MAX, |
184 | 5.81k | None => {} |
185 | | } |
186 | 7.29k | } |
187 | | |
188 | | /// Run a block of code that could potentially transition a stream's state. |
189 | | /// |
190 | | /// If the stream state transitions to closed, this function will perform |
191 | | /// all necessary cleanup. |
192 | | /// |
193 | | /// TODO: Is this function still needed? |
194 | 2.06M | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U |
195 | 2.06M | where |
196 | 2.06M | F: FnOnce(&mut Self, &mut store::Ptr) -> U, |
197 | | { |
198 | | // TODO: Does this need to be computed before performing the action? |
199 | 2.06M | let is_pending_reset = stream.is_pending_reset_expiration(); |
200 | | |
201 | | // Run the action |
202 | 2.06M | let ret = f(self, &mut stream); |
203 | | |
204 | 2.06M | self.transition_after(stream, is_pending_reset); |
205 | | |
206 | 2.06M | ret |
207 | 2.06M | } <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Ptr>::{closure#0}, ()> Line | Count | Source | 194 | 31.5k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 31.5k | where | 196 | 31.5k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 31.5k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 31.5k | let ret = f(self, &mut stream); | 203 | | | 204 | 31.5k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 31.5k | ret | 207 | 31.5k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity<h2::proto::streams::store::Store>::{closure#0}, ()> Line | Count | Source | 194 | 70.2k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 70.2k | where | 196 | 70.2k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 70.2k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 70.2k | let ret = f(self, &mut stream); | 203 | | | 204 | 70.2k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 70.2k | ret | 207 | 70.2k | } |
<h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}::{closure#0}, ()> Line | Count | Source | 194 | 202 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 202 | where | 196 | 202 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 202 | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 202 | let ret = f(self, &mut stream); | 203 | | | 204 | 202 | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 202 | ret | 207 | 202 | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::prioritize::Prioritize>::clear_pending_capacity::{closure#0}, ()> Line | Count | Source | 194 | 12.3k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 12.3k | where | 196 | 12.3k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 12.3k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 12.3k | let ret = f(self, &mut stream); | 203 | | | 204 | 12.3k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 12.3k | ret | 207 | 12.3k | } |
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::clear_stream_window_update_queue::{closure#0}, ()> <h2::proto::streams::counts::Counts>::transition::<h2::proto::streams::streams::drop_stream_ref::{closure#0}, ()> Line | Count | Source | 194 | 855k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 855k | where | 196 | 855k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 855k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 855k | let ret = f(self, &mut stream); | 203 | | | 204 | 855k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 855k | ret | 207 | 855k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()> Line | Count | Source | 194 | 568 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 568 | where | 196 | 568 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 568 | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 568 | let ret = f(self, &mut stream); | 203 | | | 204 | 568 | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 568 | ret | 207 | 568 | } |
Unexecuted instantiation: <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::recv::Recv>::send_stream_window_updates<fuzz_e2e::MockIo, bytes::bytes::Bytes>::{closure#0}, ()> <h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>> Line | Count | Source | 194 | 635 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 635 | where | 196 | 635 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 635 | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 635 | let ret = f(self, &mut stream); | 203 | | | 204 | 635 | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 635 | ret | 207 | 635 | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_headers<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>> Line | Count | Source | 194 | 2.53k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 2.53k | where | 196 | 2.53k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 2.53k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 2.53k | let ret = f(self, &mut stream); | 203 | | | 204 | 2.53k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 2.53k | ret | 207 | 2.53k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_push_promise<bytes::bytes::Bytes>::{closure#0}, core::result::Result<core::option::Option<h2::proto::streams::store::Key>, h2::proto::error::Error>> Line | Count | Source | 194 | 1.12k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 1.12k | where | 196 | 1.12k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 1.12k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 1.12k | let ret = f(self, &mut stream); | 203 | | | 204 | 1.12k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 1.12k | ret | 207 | 1.12k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_data<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::Error>> Line | Count | Source | 194 | 17.5k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 17.5k | where | 196 | 17.5k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 17.5k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 17.5k | let ret = f(self, &mut stream); | 203 | | | 204 | 17.5k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 17.5k | ret | 207 | 17.5k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Actions>::send_reset<bytes::bytes::Bytes>::{closure#0}, core::result::Result<(), h2::proto::error::GoAway>> Line | Count | Source | 194 | 57.5k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 57.5k | where | 196 | 57.5k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 57.5k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 57.5k | let ret = f(self, &mut stream); | 203 | | | 204 | 57.5k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 57.5k | ret | 207 | 57.5k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()> Line | Count | Source | 194 | 172k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 172k | where | 196 | 172k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 172k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 172k | let ret = f(self, &mut stream); | 203 | | | 204 | 172k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 172k | ret | 207 | 172k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()> Line | Count | Source | 194 | 152k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 152k | where | 196 | 152k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 152k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 152k | let ret = f(self, &mut stream); | 203 | | | 204 | 152k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 152k | ret | 207 | 152k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}::{closure#0}, ()> Line | Count | Source | 194 | 263k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 263k | where | 196 | 263k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 263k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 263k | let ret = f(self, &mut stream); | 203 | | | 204 | 263k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 263k | ret | 207 | 263k | } |
<h2::proto::streams::counts::Counts>::transition::<<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::send_data::{closure#0}, core::result::Result<(), h2::codec::error::UserError>> Line | Count | Source | 194 | 426k | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U | 195 | 426k | where | 196 | 426k | F: FnOnce(&mut Self, &mut store::Ptr) -> U, | 197 | | { | 198 | | // TODO: Does this need to be computed before performing the action? | 199 | 426k | let is_pending_reset = stream.is_pending_reset_expiration(); | 200 | | | 201 | | // Run the action | 202 | 426k | let ret = f(self, &mut stream); | 203 | | | 204 | 426k | self.transition_after(stream, is_pending_reset); | 205 | | | 206 | 426k | ret | 207 | 426k | } |
|
208 | | |
209 | | // TODO: move this to macro? |
210 | 2.75M | pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { |
211 | 2.75M | tracing::trace!( |
212 | 0 | "transition_after; stream={:?}; state={:?}; is_closed={:?}; \ |
213 | 0 | pending_send_empty={:?}; buffered_send_data={}; \ |
214 | 0 | num_recv={}; num_send={}", |
215 | 0 | stream.id, |
216 | 0 | stream.state, |
217 | 0 | stream.is_closed(), |
218 | 0 | stream.pending_send.is_empty(), |
219 | 0 | stream.buffered_send_data, |
220 | | self.num_recv_streams, |
221 | | self.num_send_streams |
222 | | ); |
223 | | |
224 | 2.75M | if stream.is_closed() { |
225 | 1.45M | if !stream.is_pending_reset_expiration() { |
226 | 1.25M | stream.unlink(); |
227 | 1.25M | if is_reset_counted { |
228 | 49.4k | self.dec_num_reset_streams(); |
229 | 1.21M | } |
230 | 192k | } |
231 | | |
232 | 1.45M | if !stream.state.is_scheduled_reset() && stream.is_counted { |
233 | 179k | tracing::trace!("dec_num_streams; stream={:?}", stream.id); |
234 | | // Decrement the number of active streams. |
235 | 179k | self.dec_num_streams(&mut stream); |
236 | 1.27M | } |
237 | 1.30M | } |
238 | | |
239 | | // Release the stream if it requires releasing |
240 | 2.75M | if stream.is_released() { |
241 | 462k | stream.remove(); |
242 | 2.29M | } |
243 | 2.75M | } |
244 | | |
245 | | /// Returns the maximum number of streams that can be initiated by this |
246 | | /// peer. |
247 | 0 | pub(crate) fn max_send_streams(&self) -> usize { |
248 | 0 | self.max_send_streams |
249 | 0 | } |
250 | | |
251 | | /// Returns the maximum number of streams that can be initiated by the |
252 | | /// remote peer. |
253 | 0 | pub(crate) fn max_recv_streams(&self) -> usize { |
254 | 0 | self.max_recv_streams |
255 | 0 | } |
256 | | |
257 | 179k | fn dec_num_streams(&mut self, stream: &mut store::Ptr) { |
258 | 179k | assert!(stream.is_counted); |
259 | | |
260 | 179k | if self.peer.is_local_init(stream.id) { |
261 | 179k | assert!(self.num_send_streams > 0); |
262 | 179k | self.num_send_streams -= 1; |
263 | 179k | stream.is_counted = false; |
264 | | } else { |
265 | 3 | assert!(self.num_recv_streams > 0); |
266 | 3 | self.num_recv_streams -= 1; |
267 | 3 | stream.is_counted = false; |
268 | | } |
269 | 179k | } |
270 | | |
271 | 49.4k | fn dec_num_reset_streams(&mut self) { |
272 | 49.4k | assert!(self.num_local_reset_streams > 0); |
273 | 49.4k | self.num_local_reset_streams -= 1; |
274 | 49.4k | } |
275 | | } |
276 | | |
277 | | impl Drop for Counts { |
278 | 13.2k | fn drop(&mut self) { |
279 | | use std::thread; |
280 | | |
281 | 13.2k | if !thread::panicking() { |
282 | 13.2k | debug_assert!(!self.has_streams()); |
283 | 0 | } |
284 | 13.2k | } |
285 | | } |