/src/h2/src/proto/streams/prioritize.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use super::store::Resolve; |
2 | | use super::*; |
3 | | |
4 | | use crate::frame::Reason; |
5 | | |
6 | | use crate::codec::UserError; |
7 | | use crate::codec::UserError::*; |
8 | | |
9 | | use bytes::buf::Take; |
10 | | use std::{ |
11 | | cmp::{self, Ordering}, |
12 | | fmt, io, mem, |
13 | | task::{Context, Poll, Waker}, |
14 | | }; |
15 | | |
16 | | /// # Warning |
17 | | /// |
18 | | /// Queued streams are ordered by stream ID, as we need to ensure that |
19 | | /// lower-numbered streams are sent headers before higher-numbered ones. |
20 | | /// This is because "idle" stream IDs – those which have been initiated but |
21 | | /// have yet to receive frames – will be implicitly closed on receipt of a |
22 | | /// frame on a higher stream ID. If these queues was not ordered by stream |
23 | | /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] |
24 | | /// idle stream is opened first. |
25 | | #[derive(Debug)] |
26 | | pub(super) struct Prioritize { |
27 | | /// Queue of streams waiting for socket capacity to send a frame. |
28 | | pending_send: store::Queue<stream::NextSend>, |
29 | | |
30 | | /// Queue of streams waiting for window capacity to produce data. |
31 | | pending_capacity: store::Queue<stream::NextSendCapacity>, |
32 | | |
33 | | /// Streams waiting for capacity due to max concurrency |
34 | | /// |
35 | | /// The `SendRequest` handle is `Clone`. This enables initiating requests |
36 | | /// from many tasks. However, offering this capability while supporting |
37 | | /// backpressure at some level is tricky. If there are many `SendRequest` |
38 | | /// handles and a single stream becomes available, which handle gets |
39 | | /// assigned that stream? Maybe that handle is no longer ready to send a |
40 | | /// request. |
41 | | /// |
42 | | /// The strategy used is to allow each `SendRequest` handle one buffered |
43 | | /// request. A `SendRequest` handle is ready to send a request if it has no |
44 | | /// associated buffered requests. This is the same strategy as `mpsc` in the |
45 | | /// futures library. |
46 | | pending_open: store::Queue<stream::NextOpen>, |
47 | | |
48 | | /// Connection level flow control governing sent data |
49 | | flow: FlowControl, |
50 | | |
51 | | /// Stream ID of the last stream opened. |
52 | | last_opened_id: StreamId, |
53 | | |
54 | | /// What `DATA` frame is currently being sent in the codec. |
55 | | in_flight_data_frame: InFlightData, |
56 | | |
57 | | /// The maximum amount of bytes a stream should buffer. |
58 | | max_buffer_size: usize, |
59 | | } |
60 | | |
61 | | #[derive(Debug, Eq, PartialEq)] |
62 | | enum InFlightData { |
63 | | /// There is no `DATA` frame in flight. |
64 | | Nothing, |
65 | | /// There is a `DATA` frame in flight belonging to the given stream. |
66 | | DataFrame(store::Key), |
67 | | /// There was a `DATA` frame, but the stream's queue was since cleared. |
68 | | Drop, |
69 | | } |
70 | | |
71 | | pub(crate) struct Prioritized<B> { |
72 | | // The buffer |
73 | | inner: Take<B>, |
74 | | |
75 | | end_of_stream: bool, |
76 | | |
77 | | // The stream that this is associated with |
78 | | stream: store::Key, |
79 | | } |
80 | | |
81 | | // ===== impl Prioritize ===== |
82 | | |
83 | | impl Prioritize { |
84 | 12.7k | pub fn new(config: &Config) -> Prioritize { |
85 | 12.7k | let mut flow = FlowControl::new(); |
86 | 12.7k | |
87 | 12.7k | flow.inc_window(config.remote_init_window_sz) |
88 | 12.7k | .expect("invalid initial window size"); |
89 | 12.7k | |
90 | 12.7k | // TODO: proper error handling |
91 | 12.7k | let _res = flow.assign_capacity(config.remote_init_window_sz); |
92 | 12.7k | debug_assert!(_res.is_ok()); |
93 | | |
94 | 12.7k | tracing::trace!("Prioritize::new; flow={:?}", flow); |
95 | | |
96 | 12.7k | Prioritize { |
97 | 12.7k | pending_send: store::Queue::new(), |
98 | 12.7k | pending_capacity: store::Queue::new(), |
99 | 12.7k | pending_open: store::Queue::new(), |
100 | 12.7k | flow, |
101 | 12.7k | last_opened_id: StreamId::ZERO, |
102 | 12.7k | in_flight_data_frame: InFlightData::Nothing, |
103 | 12.7k | max_buffer_size: config.local_max_buffer_size, |
104 | 12.7k | } |
105 | 12.7k | } |
106 | | |
107 | 0 | pub(crate) fn max_buffer_size(&self) -> usize { |
108 | 0 | self.max_buffer_size |
109 | 0 | } |
110 | | |
111 | | /// Queue a frame to be sent to the remote |
112 | 450k | pub fn queue_frame<B>( |
113 | 450k | &mut self, |
114 | 450k | frame: Frame<B>, |
115 | 450k | buffer: &mut Buffer<Frame<B>>, |
116 | 450k | stream: &mut store::Ptr, |
117 | 450k | task: &mut Option<Waker>, |
118 | 450k | ) { |
119 | 450k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); |
120 | 450k | let _e = span.enter(); |
121 | 450k | // Queue the frame in the buffer |
122 | 450k | stream.pending_send.push_back(buffer, frame); |
123 | 450k | self.schedule_send(stream, task); |
124 | 450k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::queue_frame::<_> <h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 357 | pub fn queue_frame<B>( | 113 | 357 | &mut self, | 114 | 357 | frame: Frame<B>, | 115 | 357 | buffer: &mut Buffer<Frame<B>>, | 116 | 357 | stream: &mut store::Ptr, | 117 | 357 | task: &mut Option<Waker>, | 118 | 357 | ) { | 119 | 357 | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 357 | let _e = span.enter(); | 121 | 357 | // Queue the frame in the buffer | 122 | 357 | stream.pending_send.push_back(buffer, frame); | 123 | 357 | self.schedule_send(stream, task); | 124 | 357 | } |
<h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 450k | pub fn queue_frame<B>( | 113 | 450k | &mut self, | 114 | 450k | frame: Frame<B>, | 115 | 450k | buffer: &mut Buffer<Frame<B>>, | 116 | 450k | stream: &mut store::Ptr, | 117 | 450k | task: &mut Option<Waker>, | 118 | 450k | ) { | 119 | 450k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 450k | let _e = span.enter(); | 121 | 450k | // Queue the frame in the buffer | 122 | 450k | stream.pending_send.push_back(buffer, frame); | 123 | 450k | self.schedule_send(stream, task); | 124 | 450k | } |
|
125 | | |
126 | 533k | pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { |
127 | 533k | // If the stream is waiting to be opened, nothing more to do. |
128 | 533k | if stream.is_send_ready() { |
129 | 98.4k | tracing::trace!(?stream.id, "schedule_send"); |
130 | | // Queue the stream |
131 | 98.4k | self.pending_send.push(stream); |
132 | | |
133 | | // Notify the connection. |
134 | 98.4k | if let Some(task) = task.take() { |
135 | 54 | task.wake(); |
136 | 98.4k | } |
137 | 435k | } |
138 | 533k | } |
139 | | |
140 | 426k | pub fn queue_open(&mut self, stream: &mut store::Ptr) { |
141 | 426k | self.pending_open.push(stream); |
142 | 426k | } |
143 | | |
144 | | /// Send a data frame |
145 | 425k | pub fn send_data<B>( |
146 | 425k | &mut self, |
147 | 425k | frame: frame::Data<B>, |
148 | 425k | buffer: &mut Buffer<Frame<B>>, |
149 | 425k | stream: &mut store::Ptr, |
150 | 425k | counts: &mut Counts, |
151 | 425k | task: &mut Option<Waker>, |
152 | 425k | ) -> Result<(), UserError> |
153 | 425k | where |
154 | 425k | B: Buf, |
155 | 425k | { |
156 | 425k | let sz = frame.payload().remaining(); |
157 | 425k | |
158 | 425k | if sz > MAX_WINDOW_SIZE as usize { |
159 | 0 | return Err(UserError::PayloadTooBig); |
160 | 425k | } |
161 | 425k | |
162 | 425k | let sz = sz as WindowSize; |
163 | 425k | |
164 | 425k | if !stream.state.is_send_streaming() { |
165 | 0 | if stream.state.is_closed() { |
166 | 0 | return Err(InactiveStreamId); |
167 | | } else { |
168 | 0 | return Err(UnexpectedFrameType); |
169 | | } |
170 | 425k | } |
171 | 425k | |
172 | 425k | // Update the buffered data counter |
173 | 425k | stream.buffered_send_data += sz as usize; |
174 | | |
175 | 425k | let span = |
176 | 425k | tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); |
177 | 425k | let _e = span.enter(); |
178 | 425k | tracing::trace!(buffered = stream.buffered_send_data); |
179 | | |
180 | | // Implicitly request more send capacity if not enough has been |
181 | | // requested yet. |
182 | 425k | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { |
183 | | // Update the target requested capacity |
184 | 425k | stream.requested_send_capacity = |
185 | 425k | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; |
186 | 425k | |
187 | 425k | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity |
188 | 425k | // cannot be assigned at the time it is called. |
189 | 425k | // |
190 | 425k | // Streams over the max concurrent count will still call `send_data` so we should be |
191 | 425k | // careful not to put it into `pending_capacity` as it will starve the connection |
192 | 425k | // capacity for other streams |
193 | 425k | if !stream.is_pending_open { |
194 | 0 | self.try_assign_capacity(stream); |
195 | 425k | } |
196 | 0 | } |
197 | | |
198 | 425k | if frame.is_end_stream() { |
199 | 425k | stream.state.send_close(); |
200 | 425k | self.reserve_capacity(0, stream, counts); |
201 | 425k | } |
202 | | |
203 | 425k | tracing::trace!( |
204 | 0 | available = %stream.send_flow.available(), |
205 | 0 | buffered = stream.buffered_send_data, |
206 | | ); |
207 | | |
208 | | // The `stream.buffered_send_data == 0` check is here so that, if a zero |
209 | | // length data frame is queued to the front (there is no previously |
210 | | // queued data), it gets sent out immediately even if there is no |
211 | | // available send window. |
212 | | // |
213 | | // Sending out zero length data frames can be done to signal |
214 | | // end-of-stream. |
215 | | // |
216 | 425k | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { |
217 | 0 | // The stream currently has capacity to send the data frame, so |
218 | 0 | // queue it up and notify the connection task. |
219 | 0 | self.queue_frame(frame.into(), buffer, stream, task); |
220 | 425k | } else { |
221 | 425k | // The stream has no capacity to send the frame now, save it but |
222 | 425k | // don't notify the connection task. Once additional capacity |
223 | 425k | // becomes available, the frame will be flushed. |
224 | 425k | stream.pending_send.push_back(buffer, frame.into()); |
225 | 425k | } |
226 | | |
227 | 425k | Ok(()) |
228 | 425k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::send_data::<_> <h2::proto::streams::prioritize::Prioritize>::send_data::<bytes::bytes::Bytes> Line | Count | Source | 145 | 425k | pub fn send_data<B>( | 146 | 425k | &mut self, | 147 | 425k | frame: frame::Data<B>, | 148 | 425k | buffer: &mut Buffer<Frame<B>>, | 149 | 425k | stream: &mut store::Ptr, | 150 | 425k | counts: &mut Counts, | 151 | 425k | task: &mut Option<Waker>, | 152 | 425k | ) -> Result<(), UserError> | 153 | 425k | where | 154 | 425k | B: Buf, | 155 | 425k | { | 156 | 425k | let sz = frame.payload().remaining(); | 157 | 425k | | 158 | 425k | if sz > MAX_WINDOW_SIZE as usize { | 159 | 0 | return Err(UserError::PayloadTooBig); | 160 | 425k | } | 161 | 425k | | 162 | 425k | let sz = sz as WindowSize; | 163 | 425k | | 164 | 425k | if !stream.state.is_send_streaming() { | 165 | 0 | if stream.state.is_closed() { | 166 | 0 | return Err(InactiveStreamId); | 167 | | } else { | 168 | 0 | return Err(UnexpectedFrameType); | 169 | | } | 170 | 425k | } | 171 | 425k | | 172 | 425k | // Update the buffered data counter | 173 | 425k | stream.buffered_send_data += sz as usize; | 174 | | | 175 | 425k | let span = | 176 | 425k | tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); | 177 | 425k | let _e = span.enter(); | 178 | 425k | tracing::trace!(buffered = stream.buffered_send_data); | 179 | | | 180 | | // Implicitly request more send capacity if not enough has been | 181 | | // requested yet. | 182 | 425k | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { | 183 | | // Update the target requested capacity | 184 | 425k | stream.requested_send_capacity = | 185 | 425k | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; | 186 | 425k | | 187 | 425k | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity | 188 | 425k | // cannot be assigned at the time it is called. | 189 | 425k | // | 190 | 425k | // Streams over the max concurrent count will still call `send_data` so we should be | 191 | 425k | // careful not to put it into `pending_capacity` as it will starve the connection | 192 | 425k | // capacity for other streams | 193 | 425k | if !stream.is_pending_open { | 194 | 0 | self.try_assign_capacity(stream); | 195 | 425k | } | 196 | 0 | } | 197 | | | 198 | 425k | if frame.is_end_stream() { | 199 | 425k | stream.state.send_close(); | 200 | 425k | self.reserve_capacity(0, stream, counts); | 201 | 425k | } | 202 | | | 203 | 425k | tracing::trace!( | 204 | 0 | available = %stream.send_flow.available(), | 205 | 0 | buffered = stream.buffered_send_data, | 206 | | ); | 207 | | | 208 | | // The `stream.buffered_send_data == 0` check is here so that, if a zero | 209 | | // length data frame is queued to the front (there is no previously | 210 | | // queued data), it gets sent out immediately even if there is no | 211 | | // available send window. | 212 | | // | 213 | | // Sending out zero length data frames can be done to signal | 214 | | // end-of-stream. | 215 | | // | 216 | 425k | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { | 217 | 0 | // The stream currently has capacity to send the data frame, so | 218 | 0 | // queue it up and notify the connection task. | 219 | 0 | self.queue_frame(frame.into(), buffer, stream, task); | 220 | 425k | } else { | 221 | 425k | // The stream has no capacity to send the frame now, save it but | 222 | 425k | // don't notify the connection task. Once additional capacity | 223 | 425k | // becomes available, the frame will be flushed. | 224 | 425k | stream.pending_send.push_back(buffer, frame.into()); | 225 | 425k | } | 226 | | | 227 | 425k | Ok(()) | 228 | 425k | } |
|
229 | | |
230 | | /// Request capacity to send data |
231 | 425k | pub fn reserve_capacity( |
232 | 425k | &mut self, |
233 | 425k | capacity: WindowSize, |
234 | 425k | stream: &mut store::Ptr, |
235 | 425k | counts: &mut Counts, |
236 | 425k | ) { |
237 | 425k | let span = tracing::trace_span!( |
238 | | "reserve_capacity", |
239 | | ?stream.id, |
240 | | requested = capacity, |
241 | 0 | effective = (capacity as usize) + stream.buffered_send_data, |
242 | 0 | curr = stream.requested_send_capacity |
243 | | ); |
244 | 425k | let _e = span.enter(); |
245 | 425k | |
246 | 425k | // Actual capacity is `capacity` + the current amount of buffered data. |
247 | 425k | // If it were less, then we could never send out the buffered data. |
248 | 425k | let capacity = (capacity as usize) + stream.buffered_send_data; |
249 | 425k | |
250 | 425k | match capacity.cmp(&(stream.requested_send_capacity as usize)) { |
251 | 425k | Ordering::Equal => { |
252 | 425k | // Nothing to do |
253 | 425k | } |
254 | | Ordering::Less => { |
255 | | // Update the target requested capacity |
256 | 0 | stream.requested_send_capacity = capacity as WindowSize; |
257 | 0 |
|
258 | 0 | // Currently available capacity assigned to the stream |
259 | 0 | let available = stream.send_flow.available().as_size(); |
260 | 0 |
|
261 | 0 | // If the stream has more assigned capacity than requested, reclaim |
262 | 0 | // some for the connection |
263 | 0 | if available as usize > capacity { |
264 | 0 | let diff = available - capacity as WindowSize; |
265 | 0 |
|
266 | 0 | // TODO: proper error handling |
267 | 0 | let _res = stream.send_flow.claim_capacity(diff); |
268 | 0 | debug_assert!(_res.is_ok()); |
269 | | |
270 | 0 | self.assign_connection_capacity(diff, stream, counts); |
271 | 0 | } |
272 | | } |
273 | | Ordering::Greater => { |
274 | | // If trying to *add* capacity, but the stream send side is closed, |
275 | | // there's nothing to be done. |
276 | 0 | if stream.state.is_send_closed() { |
277 | 0 | return; |
278 | 0 | } |
279 | 0 |
|
280 | 0 | // Update the target requested capacity |
281 | 0 | stream.requested_send_capacity = |
282 | 0 | cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; |
283 | 0 |
|
284 | 0 | // Try to assign additional capacity to the stream. If none is |
285 | 0 | // currently available, the stream will be queued to receive some |
286 | 0 | // when more becomes available. |
287 | 0 | self.try_assign_capacity(stream); |
288 | | } |
289 | | } |
290 | 425k | } |
291 | | |
292 | 126k | pub fn recv_stream_window_update( |
293 | 126k | &mut self, |
294 | 126k | inc: WindowSize, |
295 | 126k | stream: &mut store::Ptr, |
296 | 126k | ) -> Result<(), Reason> { |
297 | 126k | let span = tracing::trace_span!( |
298 | | "recv_stream_window_update", |
299 | | ?stream.id, |
300 | | ?stream.state, |
301 | | inc, |
302 | 0 | flow = ?stream.send_flow |
303 | | ); |
304 | 126k | let _e = span.enter(); |
305 | 126k | |
306 | 126k | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
307 | | // We can't send any data, so don't bother doing anything else. |
308 | 4.38k | return Ok(()); |
309 | 122k | } |
310 | 122k | |
311 | 122k | // Update the stream level flow control. |
312 | 122k | stream.send_flow.inc_window(inc)?; |
313 | | |
314 | | // If the stream is waiting on additional capacity, then this will |
315 | | // assign it (if available on the connection) and notify the producer |
316 | 122k | self.try_assign_capacity(stream); |
317 | 122k | |
318 | 122k | Ok(()) |
319 | 126k | } |
320 | | |
321 | 738 | pub fn recv_connection_window_update( |
322 | 738 | &mut self, |
323 | 738 | inc: WindowSize, |
324 | 738 | store: &mut Store, |
325 | 738 | counts: &mut Counts, |
326 | 738 | ) -> Result<(), Reason> { |
327 | 738 | // Update the connection's window |
328 | 738 | self.flow.inc_window(inc)?; |
329 | | |
330 | 737 | self.assign_connection_capacity(inc, store, counts); |
331 | 737 | Ok(()) |
332 | 738 | } |
333 | | |
334 | | /// Reclaim all capacity assigned to the stream and re-assign it to the |
335 | | /// connection |
336 | 577k | pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
337 | 577k | let available = stream.send_flow.available().as_size(); |
338 | 577k | if available > 0 { |
339 | | // TODO: proper error handling |
340 | 49.8k | let _res = stream.send_flow.claim_capacity(available); |
341 | 49.8k | debug_assert!(_res.is_ok()); |
342 | | // Re-assign all capacity to the connection |
343 | 49.8k | self.assign_connection_capacity(available, stream, counts); |
344 | 527k | } |
345 | 577k | } |
346 | | |
347 | | /// Reclaim just reserved capacity, not buffered capacity, and re-assign |
348 | | /// it to the connection |
349 | 82.7k | pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
350 | 82.7k | // only reclaim reserved capacity that isn't already buffered |
351 | 82.7k | if stream.send_flow.available().as_size() as usize > stream.buffered_send_data { |
352 | 0 | let reserved = |
353 | 0 | stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize; |
354 | 0 |
|
355 | 0 | // Panic safety: due to how `reserved` is computed it can't be greater |
356 | 0 | // than what's available. |
357 | 0 | stream |
358 | 0 | .send_flow |
359 | 0 | .claim_capacity(reserved) |
360 | 0 | .expect("window size should be greater than reserved"); |
361 | 0 |
|
362 | 0 | self.assign_connection_capacity(reserved, stream, counts); |
363 | 82.7k | } |
364 | 82.7k | } |
365 | | |
366 | 17.4k | pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { |
367 | 17.4k | let span = tracing::trace_span!("clear_pending_capacity"); |
368 | 17.4k | let _e = span.enter(); |
369 | 30.6k | while let Some(stream) = self.pending_capacity.pop(store) { |
370 | 13.1k | counts.transition(stream, |_, stream| { |
371 | 13.1k | tracing::trace!(?stream.id, "clear_pending_capacity"); |
372 | 13.1k | }) |
373 | | } |
374 | 17.4k | } |
375 | | |
376 | 52.4k | pub fn assign_connection_capacity<R>( |
377 | 52.4k | &mut self, |
378 | 52.4k | inc: WindowSize, |
379 | 52.4k | store: &mut R, |
380 | 52.4k | counts: &mut Counts, |
381 | 52.4k | ) where |
382 | 52.4k | R: Resolve, |
383 | 52.4k | { |
384 | 52.4k | let span = tracing::trace_span!("assign_connection_capacity", inc); |
385 | 52.4k | let _e = span.enter(); |
386 | 52.4k | |
387 | 52.4k | // TODO: proper error handling |
388 | 52.4k | let _res = self.flow.assign_capacity(inc); |
389 | 52.4k | debug_assert!(_res.is_ok()); |
390 | | |
391 | | // Assign newly acquired capacity to streams pending capacity. |
392 | 273k | while self.flow.available() > 0 { |
393 | 243k | let stream = match self.pending_capacity.pop(store) { |
394 | 221k | Some(stream) => stream, |
395 | 22.5k | None => return, |
396 | | }; |
397 | | |
398 | | // Streams pending capacity may have been reset before capacity |
399 | | // became available. In that case, the stream won't want any |
400 | | // capacity, and so we shouldn't "transition" on it, but just evict |
401 | | // it and continue the loop. |
402 | 221k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { |
403 | 133k | continue; |
404 | 87.3k | } |
405 | 87.3k | |
406 | 87.3k | counts.transition(stream, |_, stream| { |
407 | 87.3k | // Try to assign capacity to the stream. This will also re-queue the |
408 | 87.3k | // stream if there isn't enough connection level capacity to fulfill |
409 | 87.3k | // the capacity request. |
410 | 87.3k | self.try_assign_capacity(stream); |
411 | 87.3k | }) <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr>::{closure#0} Line | Count | Source | 406 | 30.6k | counts.transition(stream, |_, stream| { | 407 | 30.6k | // Try to assign capacity to the stream. This will also re-queue the | 408 | 30.6k | // stream if there isn't enough connection level capacity to fulfill | 409 | 30.6k | // the capacity request. | 410 | 30.6k | self.try_assign_capacity(stream); | 411 | 30.6k | }) |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store>::{closure#0} Line | Count | Source | 406 | 56.6k | counts.transition(stream, |_, stream| { | 407 | 56.6k | // Try to assign capacity to the stream. This will also re-queue the | 408 | 56.6k | // stream if there isn't enough connection level capacity to fulfill | 409 | 56.6k | // the capacity request. | 410 | 56.6k | self.try_assign_capacity(stream); | 411 | 56.6k | }) |
|
412 | | } |
413 | 52.4k | } <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr> Line | Count | Source | 376 | 49.8k | pub fn assign_connection_capacity<R>( | 377 | 49.8k | &mut self, | 378 | 49.8k | inc: WindowSize, | 379 | 49.8k | store: &mut R, | 380 | 49.8k | counts: &mut Counts, | 381 | 49.8k | ) where | 382 | 49.8k | R: Resolve, | 383 | 49.8k | { | 384 | 49.8k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 385 | 49.8k | let _e = span.enter(); | 386 | 49.8k | | 387 | 49.8k | // TODO: proper error handling | 388 | 49.8k | let _res = self.flow.assign_capacity(inc); | 389 | 49.8k | debug_assert!(_res.is_ok()); | 390 | | | 391 | | // Assign newly acquired capacity to streams pending capacity. | 392 | 213k | while self.flow.available() > 0 { | 393 | 184k | let stream = match self.pending_capacity.pop(store) { | 394 | 163k | Some(stream) => stream, | 395 | 20.7k | None => return, | 396 | | }; | 397 | | | 398 | | // Streams pending capacity may have been reset before capacity | 399 | | // became available. In that case, the stream won't want any | 400 | | // capacity, and so we shouldn't "transition" on it, but just evict | 401 | | // it and continue the loop. | 402 | 163k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 403 | 133k | continue; | 404 | 30.6k | } | 405 | 30.6k | | 406 | 30.6k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | | self.try_assign_capacity(stream); | 411 | 30.6k | }) | 412 | | } | 413 | 49.8k | } |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store> Line | Count | Source | 376 | 2.60k | pub fn assign_connection_capacity<R>( | 377 | 2.60k | &mut self, | 378 | 2.60k | inc: WindowSize, | 379 | 2.60k | store: &mut R, | 380 | 2.60k | counts: &mut Counts, | 381 | 2.60k | ) where | 382 | 2.60k | R: Resolve, | 383 | 2.60k | { | 384 | 2.60k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 385 | 2.60k | let _e = span.enter(); | 386 | 2.60k | | 387 | 2.60k | // TODO: proper error handling | 388 | 2.60k | let _res = self.flow.assign_capacity(inc); | 389 | 2.60k | debug_assert!(_res.is_ok()); | 390 | | | 391 | | // Assign newly acquired capacity to streams pending capacity. | 392 | 59.8k | while self.flow.available() > 0 { | 393 | 58.9k | let stream = match self.pending_capacity.pop(store) { | 394 | 57.2k | Some(stream) => stream, | 395 | 1.74k | None => return, | 396 | | }; | 397 | | | 398 | | // Streams pending capacity may have been reset before capacity | 399 | | // became available. In that case, the stream won't want any | 400 | | // capacity, and so we shouldn't "transition" on it, but just evict | 401 | | // it and continue the loop. | 402 | 57.2k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 403 | 552 | continue; | 404 | 56.6k | } | 405 | 56.6k | | 406 | 56.6k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | | self.try_assign_capacity(stream); | 411 | 56.6k | }) | 412 | | } | 413 | 2.60k | } |
|
414 | | |
415 | | /// Request capacity to send data |
416 | 394k | fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { |
417 | 394k | let total_requested = stream.requested_send_capacity; |
418 | 394k | |
419 | 394k | // Total requested should never go below actual assigned |
420 | 394k | // (Note: the window size can go lower than assigned) |
421 | 394k | debug_assert!(stream.send_flow.available() <= total_requested as usize); |
422 | | |
423 | | // The amount of additional capacity that the stream requests. |
424 | | // Don't assign more than the window has available! |
425 | 394k | let additional = cmp::min( |
426 | 394k | total_requested - stream.send_flow.available().as_size(), |
427 | 394k | // Can't assign more than what is available |
428 | 394k | stream.send_flow.window_size() - stream.send_flow.available().as_size(), |
429 | 394k | ); |
430 | 394k | let span = tracing::trace_span!("try_assign_capacity", ?stream.id); |
431 | 394k | let _e = span.enter(); |
432 | 394k | tracing::trace!( |
433 | | requested = total_requested, |
434 | | additional, |
435 | 0 | buffered = stream.buffered_send_data, |
436 | 0 | window = stream.send_flow.window_size(), |
437 | 0 | conn = %self.flow.available() |
438 | | ); |
439 | | |
440 | 394k | if additional == 0 { |
441 | | // Nothing more to do |
442 | 39.4k | return; |
443 | 354k | } |
444 | 354k | |
445 | 354k | // If the stream has requested capacity, then it must be in the |
446 | 354k | // streaming state (more data could be sent) or there is buffered data |
447 | 354k | // waiting to be sent. |
448 | 354k | debug_assert!( |
449 | 0 | stream.state.is_send_streaming() || stream.buffered_send_data > 0, |
450 | 0 | "state={:?}", |
451 | 0 | stream.state |
452 | | ); |
453 | | |
454 | | // The amount of currently available capacity on the connection |
455 | 354k | let conn_available = self.flow.available().as_size(); |
456 | 354k | |
457 | 354k | // First check if capacity is immediately available |
458 | 354k | if conn_available > 0 { |
459 | | // The amount of capacity to assign to the stream |
460 | | // TODO: Should prioritization factor into this? |
461 | 84.4k | let assign = cmp::min(conn_available, additional); |
462 | 84.4k | |
463 | 84.4k | tracing::trace!(capacity = assign, "assigning"); |
464 | | |
465 | | // Assign the capacity to the stream |
466 | 84.4k | stream.assign_capacity(assign, self.max_buffer_size); |
467 | 84.4k | |
468 | 84.4k | // Claim the capacity from the connection |
469 | 84.4k | // TODO: proper error handling |
470 | 84.4k | let _res = self.flow.claim_capacity(assign); |
471 | 84.4k | debug_assert!(_res.is_ok()); |
472 | 270k | } |
473 | | |
474 | 354k | tracing::trace!( |
475 | 0 | available = %stream.send_flow.available(), |
476 | 0 | requested = stream.requested_send_capacity, |
477 | 0 | buffered = stream.buffered_send_data, |
478 | 0 | has_unavailable = %stream.send_flow.has_unavailable() |
479 | | ); |
480 | | |
481 | 354k | if stream.send_flow.available() < stream.requested_send_capacity as usize |
482 | 331k | && stream.send_flow.has_unavailable() |
483 | 292k | { |
484 | 292k | // The stream requires additional capacity and the stream's |
485 | 292k | // window has available capacity, but the connection window |
486 | 292k | // does not. |
487 | 292k | // |
488 | 292k | // In this case, the stream needs to be queued up for when the |
489 | 292k | // connection has more capacity. |
490 | 292k | self.pending_capacity.push(stream); |
491 | 292k | } |
492 | | |
493 | | // If data is buffered and the stream is send ready, then |
494 | | // schedule the stream for execution |
495 | 354k | if stream.buffered_send_data > 0 && stream.is_send_ready() { |
496 | 257k | // TODO: This assertion isn't *exactly* correct. There can still be |
497 | 257k | // buffered send data while the stream's pending send queue is |
498 | 257k | // empty. This can happen when a large data frame is in the process |
499 | 257k | // of being **partially** sent. Once the window has been sent, the |
500 | 257k | // data frame will be returned to the prioritization layer to be |
501 | 257k | // re-scheduled. |
502 | 257k | // |
503 | 257k | // That said, it would be nice to figure out how to make this |
504 | 257k | // assertion correctly. |
505 | 257k | // |
506 | 257k | // debug_assert!(!stream.pending_send.is_empty()); |
507 | 257k | |
508 | 257k | self.pending_send.push(stream); |
509 | 257k | } |
510 | 394k | } |
511 | | |
512 | 381k | pub fn poll_complete<T, B>( |
513 | 381k | &mut self, |
514 | 381k | cx: &mut Context, |
515 | 381k | buffer: &mut Buffer<Frame<B>>, |
516 | 381k | store: &mut Store, |
517 | 381k | counts: &mut Counts, |
518 | 381k | dst: &mut Codec<T, Prioritized<B>>, |
519 | 381k | ) -> Poll<io::Result<()>> |
520 | 381k | where |
521 | 381k | T: AsyncWrite + Unpin, |
522 | 381k | B: Buf, |
523 | 381k | { |
524 | | // Ensure codec is ready |
525 | 381k | ready!(dst.poll_ready(cx))?; |
526 | | |
527 | | // Reclaim any frame that has previously been written |
528 | 381k | self.reclaim_frame(buffer, store, dst); |
529 | 381k | |
530 | 381k | // The max frame length |
531 | 381k | let max_frame_len = dst.max_send_frame_size(); |
532 | 381k | |
533 | 381k | tracing::trace!("poll_complete"); |
534 | | |
535 | | loop { |
536 | 584k | if let Some(mut stream) = self.pop_pending_open(store, counts) { |
537 | 184k | self.pending_send.push_front(&mut stream); |
538 | 184k | self.try_assign_capacity(&mut stream); |
539 | 400k | } |
540 | | |
541 | 584k | match self.pop_frame(buffer, store, max_frame_len, counts) { |
542 | 205k | Some(frame) => { |
543 | 205k | tracing::trace!(?frame, "writing"); |
544 | | |
545 | 205k | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); |
546 | 205k | if let Frame::Data(ref frame) = frame { |
547 | 19.1k | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); |
548 | 185k | } |
549 | 205k | dst.buffer(frame).expect("invalid frame"); |
550 | | |
551 | | // Ensure the codec is ready to try the loop again. |
552 | 205k | ready!(dst.poll_ready(cx))?; |
553 | | |
554 | | // Because, always try to reclaim... |
555 | 202k | self.reclaim_frame(buffer, store, dst); |
556 | | } |
557 | | None => { |
558 | | // Try to flush the codec. |
559 | 379k | ready!(dst.flush(cx))?; |
560 | | |
561 | | // This might release a data frame... |
562 | 345k | if !self.reclaim_frame(buffer, store, dst) { |
563 | 345k | return Poll::Ready(Ok(())); |
564 | 0 | } |
565 | | |
566 | | // No need to poll ready as poll_complete() does this for |
567 | | // us... |
568 | | } |
569 | | } |
570 | | } |
571 | 381k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::poll_complete::<_, _> <h2::proto::streams::prioritize::Prioritize>::poll_complete::<fuzz_e2e::MockIo, bytes::bytes::Bytes> Line | Count | Source | 512 | 381k | pub fn poll_complete<T, B>( | 513 | 381k | &mut self, | 514 | 381k | cx: &mut Context, | 515 | 381k | buffer: &mut Buffer<Frame<B>>, | 516 | 381k | store: &mut Store, | 517 | 381k | counts: &mut Counts, | 518 | 381k | dst: &mut Codec<T, Prioritized<B>>, | 519 | 381k | ) -> Poll<io::Result<()>> | 520 | 381k | where | 521 | 381k | T: AsyncWrite + Unpin, | 522 | 381k | B: Buf, | 523 | 381k | { | 524 | | // Ensure codec is ready | 525 | 381k | ready!(dst.poll_ready(cx))?; | 526 | | | 527 | | // Reclaim any frame that has previously been written | 528 | 381k | self.reclaim_frame(buffer, store, dst); | 529 | 381k | | 530 | 381k | // The max frame length | 531 | 381k | let max_frame_len = dst.max_send_frame_size(); | 532 | 381k | | 533 | 381k | tracing::trace!("poll_complete"); | 534 | | | 535 | | loop { | 536 | 584k | if let Some(mut stream) = self.pop_pending_open(store, counts) { | 537 | 184k | self.pending_send.push_front(&mut stream); | 538 | 184k | self.try_assign_capacity(&mut stream); | 539 | 400k | } | 540 | | | 541 | 584k | match self.pop_frame(buffer, store, max_frame_len, counts) { | 542 | 205k | Some(frame) => { | 543 | 205k | tracing::trace!(?frame, "writing"); | 544 | | | 545 | 205k | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); | 546 | 205k | if let Frame::Data(ref frame) = frame { | 547 | 19.1k | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); | 548 | 185k | } | 549 | 205k | dst.buffer(frame).expect("invalid frame"); | 550 | | | 551 | | // Ensure the codec is ready to try the loop again. | 552 | 205k | ready!(dst.poll_ready(cx))?; | 553 | | | 554 | | // Because, always try to reclaim... | 555 | 202k | self.reclaim_frame(buffer, store, dst); | 556 | | } | 557 | | None => { | 558 | | // Try to flush the codec. | 559 | 379k | ready!(dst.flush(cx))?; | 560 | | | 561 | | // This might release a data frame... | 562 | 345k | if !self.reclaim_frame(buffer, store, dst) { | 563 | 345k | return Poll::Ready(Ok(())); | 564 | 0 | } | 565 | | | 566 | | // No need to poll ready as poll_complete() does this for | 567 | | // us... | 568 | | } | 569 | | } | 570 | | } | 571 | 381k | } |
|
572 | | |
573 | | /// Tries to reclaim a pending data frame from the codec. |
574 | | /// |
575 | | /// Returns true if a frame was reclaimed. |
576 | | /// |
577 | | /// When a data frame is written to the codec, it may not be written in its |
578 | | /// entirety (large chunks are split up into potentially many data frames). |
579 | | /// In this case, the stream needs to be reprioritized. |
580 | 929k | fn reclaim_frame<T, B>( |
581 | 929k | &mut self, |
582 | 929k | buffer: &mut Buffer<Frame<B>>, |
583 | 929k | store: &mut Store, |
584 | 929k | dst: &mut Codec<T, Prioritized<B>>, |
585 | 929k | ) -> bool |
586 | 929k | where |
587 | 929k | B: Buf, |
588 | 929k | { |
589 | 929k | let span = tracing::trace_span!("try_reclaim_frame"); |
590 | 929k | let _e = span.enter(); |
591 | | |
592 | | // First check if there are any data chunks to take back |
593 | 929k | if let Some(frame) = dst.take_last_data_frame() { |
594 | 17.8k | self.reclaim_frame_inner(buffer, store, frame) |
595 | | } else { |
596 | 912k | false |
597 | | } |
598 | 929k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<_, _> <h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<fuzz_e2e::MockIo, bytes::bytes::Bytes> Line | Count | Source | 580 | 929k | fn reclaim_frame<T, B>( | 581 | 929k | &mut self, | 582 | 929k | buffer: &mut Buffer<Frame<B>>, | 583 | 929k | store: &mut Store, | 584 | 929k | dst: &mut Codec<T, Prioritized<B>>, | 585 | 929k | ) -> bool | 586 | 929k | where | 587 | 929k | B: Buf, | 588 | 929k | { | 589 | 929k | let span = tracing::trace_span!("try_reclaim_frame"); | 590 | 929k | let _e = span.enter(); | 591 | | | 592 | | // First check if there are any data chunks to take back | 593 | 929k | if let Some(frame) = dst.take_last_data_frame() { | 594 | 17.8k | self.reclaim_frame_inner(buffer, store, frame) | 595 | | } else { | 596 | 912k | false | 597 | | } | 598 | 929k | } |
|
599 | | |
600 | 17.8k | fn reclaim_frame_inner<B>( |
601 | 17.8k | &mut self, |
602 | 17.8k | buffer: &mut Buffer<Frame<B>>, |
603 | 17.8k | store: &mut Store, |
604 | 17.8k | frame: frame::Data<Prioritized<B>>, |
605 | 17.8k | ) -> bool |
606 | 17.8k | where |
607 | 17.8k | B: Buf, |
608 | 17.8k | { |
609 | 17.8k | tracing::trace!( |
610 | | ?frame, |
611 | 0 | sz = frame.payload().inner.get_ref().remaining(), |
612 | 0 | "reclaimed" |
613 | | ); |
614 | | |
615 | 17.8k | let mut eos = false; |
616 | 17.8k | let key = frame.payload().stream; |
617 | 17.8k | |
618 | 17.8k | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { |
619 | 0 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), |
620 | | InFlightData::Drop => { |
621 | 39 | tracing::trace!("not reclaiming frame for cancelled stream"); |
622 | 39 | return false; |
623 | | } |
624 | 17.7k | InFlightData::DataFrame(k) => { |
625 | 17.7k | debug_assert_eq!(k, key); |
626 | | } |
627 | | } |
628 | | |
629 | 17.7k | let mut frame = frame.map(|prioritized| { |
630 | 17.7k | // TODO: Ensure fully written |
631 | 17.7k | eos = prioritized.end_of_stream; |
632 | 17.7k | prioritized.inner.into_inner() |
633 | 17.7k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_>::{closure#0} <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes>::{closure#0} Line | Count | Source | 629 | 17.7k | let mut frame = frame.map(|prioritized| { | 630 | 17.7k | // TODO: Ensure fully written | 631 | 17.7k | eos = prioritized.end_of_stream; | 632 | 17.7k | prioritized.inner.into_inner() | 633 | 17.7k | }); |
|
634 | 17.7k | |
635 | 17.7k | if frame.payload().has_remaining() { |
636 | 17.1k | let mut stream = store.resolve(key); |
637 | 17.1k | |
638 | 17.1k | if eos { |
639 | 17.1k | frame.set_end_stream(true); |
640 | 17.1k | } |
641 | | |
642 | 17.1k | self.push_back_frame(frame.into(), buffer, &mut stream); |
643 | 17.1k | |
644 | 17.1k | return true; |
645 | 603 | } |
646 | 603 | |
647 | 603 | false |
648 | 17.8k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_> <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes> Line | Count | Source | 600 | 17.8k | fn reclaim_frame_inner<B>( | 601 | 17.8k | &mut self, | 602 | 17.8k | buffer: &mut Buffer<Frame<B>>, | 603 | 17.8k | store: &mut Store, | 604 | 17.8k | frame: frame::Data<Prioritized<B>>, | 605 | 17.8k | ) -> bool | 606 | 17.8k | where | 607 | 17.8k | B: Buf, | 608 | 17.8k | { | 609 | 17.8k | tracing::trace!( | 610 | | ?frame, | 611 | 0 | sz = frame.payload().inner.get_ref().remaining(), | 612 | 0 | "reclaimed" | 613 | | ); | 614 | | | 615 | 17.8k | let mut eos = false; | 616 | 17.8k | let key = frame.payload().stream; | 617 | 17.8k | | 618 | 17.8k | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { | 619 | 0 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), | 620 | | InFlightData::Drop => { | 621 | 39 | tracing::trace!("not reclaiming frame for cancelled stream"); | 622 | 39 | return false; | 623 | | } | 624 | 17.7k | InFlightData::DataFrame(k) => { | 625 | 17.7k | debug_assert_eq!(k, key); | 626 | | } | 627 | | } | 628 | | | 629 | 17.7k | let mut frame = frame.map(|prioritized| { | 630 | | // TODO: Ensure fully written | 631 | | eos = prioritized.end_of_stream; | 632 | | prioritized.inner.into_inner() | 633 | 17.7k | }); | 634 | 17.7k | | 635 | 17.7k | if frame.payload().has_remaining() { | 636 | 17.1k | let mut stream = store.resolve(key); | 637 | 17.1k | | 638 | 17.1k | if eos { | 639 | 17.1k | frame.set_end_stream(true); | 640 | 17.1k | } | 641 | | | 642 | 17.1k | self.push_back_frame(frame.into(), buffer, &mut stream); | 643 | 17.1k | | 644 | 17.1k | return true; | 645 | 603 | } | 646 | 603 | | 647 | 603 | false | 648 | 17.8k | } |
|
649 | | |
650 | | /// Push the frame to the front of the stream's deque, scheduling the |
651 | | /// stream if needed. |
652 | 17.1k | fn push_back_frame<B>( |
653 | 17.1k | &mut self, |
654 | 17.1k | frame: Frame<B>, |
655 | 17.1k | buffer: &mut Buffer<Frame<B>>, |
656 | 17.1k | stream: &mut store::Ptr, |
657 | 17.1k | ) { |
658 | 17.1k | // Push the frame to the front of the stream's deque |
659 | 17.1k | stream.pending_send.push_front(buffer, frame); |
660 | 17.1k | |
661 | 17.1k | // If needed, schedule the sender |
662 | 17.1k | if stream.send_flow.available() > 0 { |
663 | 2.43k | debug_assert!(!stream.pending_send.is_empty()); |
664 | 2.43k | self.pending_send.push(stream); |
665 | 14.7k | } |
666 | 17.1k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::push_back_frame::<_> <h2::proto::streams::prioritize::Prioritize>::push_back_frame::<bytes::bytes::Bytes> Line | Count | Source | 652 | 17.1k | fn push_back_frame<B>( | 653 | 17.1k | &mut self, | 654 | 17.1k | frame: Frame<B>, | 655 | 17.1k | buffer: &mut Buffer<Frame<B>>, | 656 | 17.1k | stream: &mut store::Ptr, | 657 | 17.1k | ) { | 658 | 17.1k | // Push the frame to the front of the stream's deque | 659 | 17.1k | stream.pending_send.push_front(buffer, frame); | 660 | 17.1k | | 661 | 17.1k | // If needed, schedule the sender | 662 | 17.1k | if stream.send_flow.available() > 0 { | 663 | 2.43k | debug_assert!(!stream.pending_send.is_empty()); | 664 | 2.43k | self.pending_send.push(stream); | 665 | 14.7k | } | 666 | 17.1k | } |
|
667 | | |
668 | 577k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { |
669 | 577k | let span = tracing::trace_span!("clear_queue", ?stream.id); |
670 | 577k | let _e = span.enter(); |
671 | | |
672 | | // TODO: make this more efficient? |
673 | 1.26M | while let Some(frame) = stream.pending_send.pop_front(buffer) { |
674 | 688k | tracing::trace!(?frame, "dropping"); |
675 | | } |
676 | | |
677 | 577k | stream.buffered_send_data = 0; |
678 | 577k | stream.requested_send_capacity = 0; |
679 | 577k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { |
680 | 33.7k | if stream.key() == key { |
681 | 1.32k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. |
682 | 1.32k | self.in_flight_data_frame = InFlightData::Drop; |
683 | 32.4k | } |
684 | 543k | } |
685 | 577k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::clear_queue::<_> <h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 357 | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 357 | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 357 | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 714 | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 357 | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 357 | stream.buffered_send_data = 0; | 678 | 357 | stream.requested_send_capacity = 0; | 679 | 357 | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 0 | if stream.key() == key { | 681 | 0 | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 0 | self.in_flight_data_frame = InFlightData::Drop; | 683 | 0 | } | 684 | 357 | } | 685 | 357 | } |
<h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 577k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 577k | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 577k | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 1.26M | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 688k | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 577k | stream.buffered_send_data = 0; | 678 | 577k | stream.requested_send_capacity = 0; | 679 | 577k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 33.7k | if stream.key() == key { | 681 | 1.32k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 1.32k | self.in_flight_data_frame = InFlightData::Drop; | 683 | 32.4k | } | 684 | 543k | } | 685 | 577k | } |
|
686 | | |
687 | 17.4k | pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { |
688 | 193k | while let Some(mut stream) = self.pending_send.pop(store) { |
689 | 176k | let is_pending_reset = stream.is_pending_reset_expiration(); |
690 | 176k | if let Some(reason) = stream.state.get_scheduled_reset() { |
691 | 73.8k | stream.set_reset(reason, Initiator::Library); |
692 | 102k | } |
693 | 176k | counts.transition_after(stream, is_pending_reset); |
694 | | } |
695 | 17.4k | } |
696 | | |
697 | 17.4k | pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { |
698 | 258k | while let Some(stream) = self.pending_open.pop(store) { |
699 | 241k | let is_pending_reset = stream.is_pending_reset_expiration(); |
700 | 241k | counts.transition_after(stream, is_pending_reset); |
701 | 241k | } |
702 | 17.4k | } |
703 | | |
704 | 584k | fn pop_frame<B>( |
705 | 584k | &mut self, |
706 | 584k | buffer: &mut Buffer<Frame<B>>, |
707 | 584k | store: &mut Store, |
708 | 584k | max_len: usize, |
709 | 584k | counts: &mut Counts, |
710 | 584k | ) -> Option<Frame<Prioritized<B>>> |
711 | 584k | where |
712 | 584k | B: Buf, |
713 | 584k | { |
714 | 584k | let span = tracing::trace_span!("pop_frame"); |
715 | 584k | let _e = span.enter(); |
716 | | |
717 | | loop { |
718 | 620k | match self.pending_send.pop(store) { |
719 | 240k | Some(mut stream) => { |
720 | 240k | let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); |
721 | 240k | let _e = span.enter(); |
722 | 240k | |
723 | 240k | // It's possible that this stream, besides having data to send, |
724 | 240k | // is also queued to send a reset, and thus is already in the queue |
725 | 240k | // to wait for "some time" after a reset. |
726 | 240k | // |
727 | 240k | // To be safe, we just always ask the stream. |
728 | 240k | let is_pending_reset = stream.is_pending_reset_expiration(); |
729 | 240k | |
730 | 240k | tracing::trace!(is_pending_reset); |
731 | | |
732 | 240k | let frame = match stream.pending_send.pop_front(buffer) { |
733 | 46.5k | Some(Frame::Data(mut frame)) => { |
734 | 46.5k | // Get the amount of capacity remaining for stream's |
735 | 46.5k | // window. |
736 | 46.5k | let stream_capacity = stream.send_flow.available(); |
737 | 46.5k | let sz = frame.payload().remaining(); |
738 | 46.5k | |
739 | 46.5k | tracing::trace!( |
740 | | sz, |
741 | 0 | eos = frame.is_end_stream(), |
742 | 0 | window = %stream_capacity, |
743 | 0 | available = %stream.send_flow.available(), |
744 | 0 | requested = stream.requested_send_capacity, |
745 | 0 | buffered = stream.buffered_send_data, |
746 | 0 | "data frame" |
747 | | ); |
748 | | |
749 | | // Zero length data frames always have capacity to |
750 | | // be sent. |
751 | 46.5k | if sz > 0 && stream_capacity == 0 { |
752 | 27.4k | tracing::trace!("stream capacity is 0"); |
753 | | |
754 | | // Ensure that the stream is waiting for |
755 | | // connection level capacity |
756 | | // |
757 | | // TODO: uncomment |
758 | | // debug_assert!(stream.is_pending_send_capacity); |
759 | | |
760 | | // The stream has no more capacity, this can |
761 | | // happen if the remote reduced the stream |
762 | | // window. In this case, we need to buffer the |
763 | | // frame and wait for a window update... |
764 | 27.4k | stream.pending_send.push_front(buffer, frame.into()); |
765 | 27.4k | |
766 | 27.4k | continue; |
767 | 19.1k | } |
768 | 19.1k | |
769 | 19.1k | // Only send up to the max frame length |
770 | 19.1k | let len = cmp::min(sz, max_len); |
771 | 19.1k | |
772 | 19.1k | // Only send up to the stream's window capacity |
773 | 19.1k | let len = |
774 | 19.1k | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; |
775 | 19.1k | |
776 | 19.1k | // There *must* be be enough connection level |
777 | 19.1k | // capacity at this point. |
778 | 19.1k | debug_assert!(len <= self.flow.window_size()); |
779 | | |
780 | | // Check if the stream level window the peer knows is available. In some |
781 | | // scenarios, maybe the window we know is available but the window which |
782 | | // peer knows is not. |
783 | 19.1k | if len > 0 && len > stream.send_flow.window_size() { |
784 | 0 | stream.pending_send.push_front(buffer, frame.into()); |
785 | 0 | continue; |
786 | 19.1k | } |
787 | 19.1k | |
788 | 19.1k | tracing::trace!(len, "sending data frame"); |
789 | | |
790 | | // Update the flow control |
791 | 19.1k | tracing::trace_span!("updating stream flow").in_scope(|| { |
792 | 19.1k | stream.send_data(len, self.max_buffer_size); |
793 | 19.1k | |
794 | 19.1k | // Assign the capacity back to the connection that |
795 | 19.1k | // was just consumed from the stream in the previous |
796 | 19.1k | // line. |
797 | 19.1k | // TODO: proper error handling |
798 | 19.1k | let _res = self.flow.assign_capacity(len); |
799 | 19.1k | debug_assert!(_res.is_ok()); |
800 | 19.1k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#0} <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#0} Line | Count | Source | 791 | 19.1k | tracing::trace_span!("updating stream flow").in_scope(|| { | 792 | 19.1k | stream.send_data(len, self.max_buffer_size); | 793 | 19.1k | | 794 | 19.1k | // Assign the capacity back to the connection that | 795 | 19.1k | // was just consumed from the stream in the previous | 796 | 19.1k | // line. | 797 | 19.1k | // TODO: proper error handling | 798 | 19.1k | let _res = self.flow.assign_capacity(len); | 799 | 19.1k | debug_assert!(_res.is_ok()); | 800 | 19.1k | }); |
|
801 | | |
802 | 19.1k | let (eos, len) = tracing::trace_span!("updating connection flow") |
803 | 19.1k | .in_scope(|| { |
804 | 19.1k | // TODO: proper error handling |
805 | 19.1k | let _res = self.flow.send_data(len); |
806 | 19.1k | debug_assert!(_res.is_ok()); |
807 | | |
808 | | // Wrap the frame's data payload to ensure that the |
809 | | // correct amount of data gets written. |
810 | | |
811 | 19.1k | let eos = frame.is_end_stream(); |
812 | 19.1k | let len = len as usize; |
813 | 19.1k | |
814 | 19.1k | if frame.payload().remaining() > len { |
815 | 18.5k | frame.set_end_stream(false); |
816 | 18.5k | } |
817 | 19.1k | (eos, len) |
818 | 19.1k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#1} <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#1} Line | Count | Source | 803 | 19.1k | .in_scope(|| { | 804 | 19.1k | // TODO: proper error handling | 805 | 19.1k | let _res = self.flow.send_data(len); | 806 | 19.1k | debug_assert!(_res.is_ok()); | 807 | | | 808 | | // Wrap the frame's data payload to ensure that the | 809 | | // correct amount of data gets written. | 810 | | | 811 | 19.1k | let eos = frame.is_end_stream(); | 812 | 19.1k | let len = len as usize; | 813 | 19.1k | | 814 | 19.1k | if frame.payload().remaining() > len { | 815 | 18.5k | frame.set_end_stream(false); | 816 | 18.5k | } | 817 | 19.1k | (eos, len) | 818 | 19.1k | }); |
|
819 | 19.1k | |
820 | 19.1k | Frame::Data(frame.map(|buf| Prioritized { |
821 | 19.1k | inner: buf.take(len), |
822 | 19.1k | end_of_stream: eos, |
823 | 19.1k | stream: stream.key(), |
824 | 19.1k | })) Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#2} <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#2} Line | Count | Source | 820 | 19.1k | Frame::Data(frame.map(|buf| Prioritized { | 821 | 19.1k | inner: buf.take(len), | 822 | 19.1k | end_of_stream: eos, | 823 | 19.1k | stream: stream.key(), | 824 | 19.1k | })) |
|
825 | | } |
826 | 0 | Some(Frame::PushPromise(pp)) => { |
827 | 0 | let mut pushed = |
828 | 0 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); |
829 | 0 | pushed.is_pending_push = false; |
830 | 0 | // Transition stream from pending_push to pending_open |
831 | 0 | // if possible |
832 | 0 | if !pushed.pending_send.is_empty() { |
833 | 0 | if counts.can_inc_num_send_streams() { |
834 | 0 | counts.inc_num_send_streams(&mut pushed); |
835 | 0 | self.pending_send.push(&mut pushed); |
836 | 0 | } else { |
837 | 0 | self.queue_open(&mut pushed); |
838 | 0 | } |
839 | 0 | } |
840 | 0 | Frame::PushPromise(pp) |
841 | | } |
842 | 185k | Some(frame) => frame.map(|_| { |
843 | 0 | unreachable!( |
844 | 0 | "Frame::map closure will only be called \ |
845 | 0 | on DATA frames." |
846 | 0 | ) Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#3} Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#3} |
847 | 185k | }), |
848 | | None => { |
849 | 8.40k | if let Some(reason) = stream.state.get_scheduled_reset() { |
850 | 100 | stream.set_reset(reason, Initiator::Library); |
851 | 100 | |
852 | 100 | let frame = frame::Reset::new(stream.id, reason); |
853 | 100 | Frame::Reset(frame) |
854 | | } else { |
855 | | // If the stream receives a RESET from the peer, it may have |
856 | | // had data buffered to be sent, but all the frames are cleared |
857 | | // in clear_queue(). Instead of doing O(N) traversal through queue |
858 | | // to remove, lets just ignore the stream here. |
859 | 8.30k | tracing::trace!("removing dangling stream from pending_send"); |
860 | | // Since this should only happen as a consequence of `clear_queue`, |
861 | | // we must be in a closed state of some kind. |
862 | 8.30k | debug_assert!(stream.state.is_closed()); |
863 | 8.30k | counts.transition_after(stream, is_pending_reset); |
864 | 8.30k | continue; |
865 | | } |
866 | | } |
867 | | }; |
868 | | |
869 | 205k | tracing::trace!("pop_frame; frame={:?}", frame); |
870 | | |
871 | 205k | if cfg!(debug_assertions) && stream.state.is_idle() { |
872 | 0 | debug_assert!(stream.id > self.last_opened_id); |
873 | 0 | self.last_opened_id = stream.id; |
874 | 205k | } |
875 | | |
876 | 205k | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { |
877 | 184k | // TODO: Only requeue the sender IF it is ready to send |
878 | 184k | // the next frame. i.e. don't requeue it if the next |
879 | 184k | // frame is a data frame and the stream does not have |
880 | 184k | // any more capacity. |
881 | 184k | self.pending_send.push(&mut stream); |
882 | 184k | } |
883 | | |
884 | 205k | counts.transition_after(stream, is_pending_reset); |
885 | 205k | |
886 | 205k | return Some(frame); |
887 | | } |
888 | 379k | None => return None, |
889 | | } |
890 | | } |
891 | 584k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_> <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes> Line | Count | Source | 704 | 584k | fn pop_frame<B>( | 705 | 584k | &mut self, | 706 | 584k | buffer: &mut Buffer<Frame<B>>, | 707 | 584k | store: &mut Store, | 708 | 584k | max_len: usize, | 709 | 584k | counts: &mut Counts, | 710 | 584k | ) -> Option<Frame<Prioritized<B>>> | 711 | 584k | where | 712 | 584k | B: Buf, | 713 | 584k | { | 714 | 584k | let span = tracing::trace_span!("pop_frame"); | 715 | 584k | let _e = span.enter(); | 716 | | | 717 | | loop { | 718 | 620k | match self.pending_send.pop(store) { | 719 | 240k | Some(mut stream) => { | 720 | 240k | let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); | 721 | 240k | let _e = span.enter(); | 722 | 240k | | 723 | 240k | // It's possible that this stream, besides having data to send, | 724 | 240k | // is also queued to send a reset, and thus is already in the queue | 725 | 240k | // to wait for "some time" after a reset. | 726 | 240k | // | 727 | 240k | // To be safe, we just always ask the stream. | 728 | 240k | let is_pending_reset = stream.is_pending_reset_expiration(); | 729 | 240k | | 730 | 240k | tracing::trace!(is_pending_reset); | 731 | | | 732 | 240k | let frame = match stream.pending_send.pop_front(buffer) { | 733 | 46.5k | Some(Frame::Data(mut frame)) => { | 734 | 46.5k | // Get the amount of capacity remaining for stream's | 735 | 46.5k | // window. | 736 | 46.5k | let stream_capacity = stream.send_flow.available(); | 737 | 46.5k | let sz = frame.payload().remaining(); | 738 | 46.5k | | 739 | 46.5k | tracing::trace!( | 740 | | sz, | 741 | 0 | eos = frame.is_end_stream(), | 742 | 0 | window = %stream_capacity, | 743 | 0 | available = %stream.send_flow.available(), | 744 | 0 | requested = stream.requested_send_capacity, | 745 | 0 | buffered = stream.buffered_send_data, | 746 | 0 | "data frame" | 747 | | ); | 748 | | | 749 | | // Zero length data frames always have capacity to | 750 | | // be sent. | 751 | 46.5k | if sz > 0 && stream_capacity == 0 { | 752 | 27.4k | tracing::trace!("stream capacity is 0"); | 753 | | | 754 | | // Ensure that the stream is waiting for | 755 | | // connection level capacity | 756 | | // | 757 | | // TODO: uncomment | 758 | | // debug_assert!(stream.is_pending_send_capacity); | 759 | | | 760 | | // The stream has no more capacity, this can | 761 | | // happen if the remote reduced the stream | 762 | | // window. In this case, we need to buffer the | 763 | | // frame and wait for a window update... | 764 | 27.4k | stream.pending_send.push_front(buffer, frame.into()); | 765 | 27.4k | | 766 | 27.4k | continue; | 767 | 19.1k | } | 768 | 19.1k | | 769 | 19.1k | // Only send up to the max frame length | 770 | 19.1k | let len = cmp::min(sz, max_len); | 771 | 19.1k | | 772 | 19.1k | // Only send up to the stream's window capacity | 773 | 19.1k | let len = | 774 | 19.1k | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; | 775 | 19.1k | | 776 | 19.1k | // There *must* be be enough connection level | 777 | 19.1k | // capacity at this point. | 778 | 19.1k | debug_assert!(len <= self.flow.window_size()); | 779 | | | 780 | | // Check if the stream level window the peer knows is available. In some | 781 | | // scenarios, maybe the window we know is available but the window which | 782 | | // peer knows is not. | 783 | 19.1k | if len > 0 && len > stream.send_flow.window_size() { | 784 | 0 | stream.pending_send.push_front(buffer, frame.into()); | 785 | 0 | continue; | 786 | 19.1k | } | 787 | 19.1k | | 788 | 19.1k | tracing::trace!(len, "sending data frame"); | 789 | | | 790 | | // Update the flow control | 791 | 19.1k | tracing::trace_span!("updating stream flow").in_scope(|| { | 792 | | stream.send_data(len, self.max_buffer_size); | 793 | | | 794 | | // Assign the capacity back to the connection that | 795 | | // was just consumed from the stream in the previous | 796 | | // line. | 797 | | // TODO: proper error handling | 798 | | let _res = self.flow.assign_capacity(len); | 799 | | debug_assert!(_res.is_ok()); | 800 | 19.1k | }); | 801 | | | 802 | 19.1k | let (eos, len) = tracing::trace_span!("updating connection flow") | 803 | 19.1k | .in_scope(|| { | 804 | | // TODO: proper error handling | 805 | | let _res = self.flow.send_data(len); | 806 | | debug_assert!(_res.is_ok()); | 807 | | | 808 | | // Wrap the frame's data payload to ensure that the | 809 | | // correct amount of data gets written. | 810 | | | 811 | | let eos = frame.is_end_stream(); | 812 | | let len = len as usize; | 813 | | | 814 | | if frame.payload().remaining() > len { | 815 | | frame.set_end_stream(false); | 816 | | } | 817 | | (eos, len) | 818 | 19.1k | }); | 819 | 19.1k | | 820 | 19.1k | Frame::Data(frame.map(|buf| Prioritized { | 821 | | inner: buf.take(len), | 822 | | end_of_stream: eos, | 823 | | stream: stream.key(), | 824 | 19.1k | })) | 825 | | } | 826 | 0 | Some(Frame::PushPromise(pp)) => { | 827 | 0 | let mut pushed = | 828 | 0 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); | 829 | 0 | pushed.is_pending_push = false; | 830 | 0 | // Transition stream from pending_push to pending_open | 831 | 0 | // if possible | 832 | 0 | if !pushed.pending_send.is_empty() { | 833 | 0 | if counts.can_inc_num_send_streams() { | 834 | 0 | counts.inc_num_send_streams(&mut pushed); | 835 | 0 | self.pending_send.push(&mut pushed); | 836 | 0 | } else { | 837 | 0 | self.queue_open(&mut pushed); | 838 | 0 | } | 839 | 0 | } | 840 | 0 | Frame::PushPromise(pp) | 841 | | } | 842 | 185k | Some(frame) => frame.map(|_| { | 843 | | unreachable!( | 844 | | "Frame::map closure will only be called \ | 845 | | on DATA frames." | 846 | | ) | 847 | 185k | }), | 848 | | None => { | 849 | 8.40k | if let Some(reason) = stream.state.get_scheduled_reset() { | 850 | 100 | stream.set_reset(reason, Initiator::Library); | 851 | 100 | | 852 | 100 | let frame = frame::Reset::new(stream.id, reason); | 853 | 100 | Frame::Reset(frame) | 854 | | } else { | 855 | | // If the stream receives a RESET from the peer, it may have | 856 | | // had data buffered to be sent, but all the frames are cleared | 857 | | // in clear_queue(). Instead of doing O(N) traversal through queue | 858 | | // to remove, lets just ignore the stream here. | 859 | 8.30k | tracing::trace!("removing dangling stream from pending_send"); | 860 | | // Since this should only happen as a consequence of `clear_queue`, | 861 | | // we must be in a closed state of some kind. | 862 | 8.30k | debug_assert!(stream.state.is_closed()); | 863 | 8.30k | counts.transition_after(stream, is_pending_reset); | 864 | 8.30k | continue; | 865 | | } | 866 | | } | 867 | | }; | 868 | | | 869 | 205k | tracing::trace!("pop_frame; frame={:?}", frame); | 870 | | | 871 | 205k | if cfg!(debug_assertions) && stream.state.is_idle() { | 872 | 0 | debug_assert!(stream.id > self.last_opened_id); | 873 | 0 | self.last_opened_id = stream.id; | 874 | 205k | } | 875 | | | 876 | 205k | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { | 877 | 184k | // TODO: Only requeue the sender IF it is ready to send | 878 | 184k | // the next frame. i.e. don't requeue it if the next | 879 | 184k | // frame is a data frame and the stream does not have | 880 | 184k | // any more capacity. | 881 | 184k | self.pending_send.push(&mut stream); | 882 | 184k | } | 883 | | | 884 | 205k | counts.transition_after(stream, is_pending_reset); | 885 | 205k | | 886 | 205k | return Some(frame); | 887 | | } | 888 | 379k | None => return None, | 889 | | } | 890 | | } | 891 | 584k | } |
|
892 | | |
893 | 584k | fn pop_pending_open<'s>( |
894 | 584k | &mut self, |
895 | 584k | store: &'s mut Store, |
896 | 584k | counts: &mut Counts, |
897 | 584k | ) -> Option<store::Ptr<'s>> { |
898 | 584k | tracing::trace!("schedule_pending_open"); |
899 | | // check for any pending open streams |
900 | 584k | if counts.can_inc_num_send_streams() { |
901 | 568k | if let Some(mut stream) = self.pending_open.pop(store) { |
902 | 184k | tracing::trace!("schedule_pending_open; stream={:?}", stream.id); |
903 | | |
904 | 184k | counts.inc_num_send_streams(&mut stream); |
905 | 184k | stream.notify_send(); |
906 | 184k | return Some(stream); |
907 | 384k | } |
908 | 16.0k | } |
909 | | |
910 | 400k | None |
911 | 584k | } |
912 | | } |
913 | | |
914 | | // ===== impl Prioritized ===== |
915 | | |
916 | | impl<B> Buf for Prioritized<B> |
917 | | where |
918 | | B: Buf, |
919 | | { |
920 | 2.72M | fn remaining(&self) -> usize { |
921 | 2.72M | self.inner.remaining() |
922 | 2.72M | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::remaining Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining Line | Count | Source | 920 | 2.72M | fn remaining(&self) -> usize { | 921 | 2.72M | self.inner.remaining() | 922 | 2.72M | } |
|
923 | | |
924 | 513k | fn chunk(&self) -> &[u8] { |
925 | 513k | self.inner.chunk() |
926 | 513k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunk Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk Line | Count | Source | 924 | 513k | fn chunk(&self) -> &[u8] { | 925 | 513k | self.inner.chunk() | 926 | 513k | } |
|
927 | | |
928 | 0 | fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { |
929 | 0 | self.inner.chunks_vectored(dst) |
930 | 0 | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunks_vectored Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunks_vectored |
931 | | |
932 | 21.9k | fn advance(&mut self, cnt: usize) { |
933 | 21.9k | self.inner.advance(cnt) |
934 | 21.9k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::advance Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance Line | Count | Source | 932 | 21.9k | fn advance(&mut self, cnt: usize) { | 933 | 21.9k | self.inner.advance(cnt) | 934 | 21.9k | } |
|
935 | | } |
936 | | |
937 | | impl<B: Buf> fmt::Debug for Prioritized<B> { |
938 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
939 | 0 | fmt.debug_struct("Prioritized") |
940 | 0 | .field("remaining", &self.inner.get_ref().remaining()) |
941 | 0 | .field("end_of_stream", &self.end_of_stream) |
942 | 0 | .field("stream", &self.stream) |
943 | 0 | .finish() |
944 | 0 | } |
945 | | } |