/src/h2/src/proto/streams/prioritize.rs
Line | Count | Source |
1 | | use super::store::Resolve; |
2 | | use super::*; |
3 | | |
4 | | use crate::frame::Reason; |
5 | | |
6 | | use crate::codec::UserError; |
7 | | use crate::codec::UserError::*; |
8 | | |
9 | | use bytes::buf::Take; |
10 | | use std::{ |
11 | | cmp::{self, Ordering}, |
12 | | fmt, io, mem, |
13 | | task::{Context, Poll, Waker}, |
14 | | }; |
15 | | |
16 | | /// # Warning |
17 | | /// |
18 | | /// Queued streams are ordered by stream ID, as we need to ensure that |
19 | | /// lower-numbered streams are sent headers before higher-numbered ones. |
20 | | /// This is because "idle" stream IDs – those which have been initiated but |
21 | | /// have yet to receive frames – will be implicitly closed on receipt of a |
22 | | /// frame on a higher stream ID. If these queues was not ordered by stream |
23 | | /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] |
24 | | /// idle stream is opened first. |
25 | | #[derive(Debug)] |
26 | | pub(super) struct Prioritize { |
27 | | /// Queue of streams waiting for socket capacity to send a frame. |
28 | | pending_send: store::Queue<stream::NextSend>, |
29 | | |
30 | | /// Queue of streams waiting for window capacity to produce data. |
31 | | pending_capacity: store::Queue<stream::NextSendCapacity>, |
32 | | |
33 | | /// Streams waiting for capacity due to max concurrency |
34 | | /// |
35 | | /// The `SendRequest` handle is `Clone`. This enables initiating requests |
36 | | /// from many tasks. However, offering this capability while supporting |
37 | | /// backpressure at some level is tricky. If there are many `SendRequest` |
38 | | /// handles and a single stream becomes available, which handle gets |
39 | | /// assigned that stream? Maybe that handle is no longer ready to send a |
40 | | /// request. |
41 | | /// |
42 | | /// The strategy used is to allow each `SendRequest` handle one buffered |
43 | | /// request. A `SendRequest` handle is ready to send a request if it has no |
44 | | /// associated buffered requests. This is the same strategy as `mpsc` in the |
45 | | /// futures library. |
46 | | pending_open: store::Queue<stream::NextOpen>, |
47 | | |
48 | | /// Connection level flow control governing sent data |
49 | | flow: FlowControl, |
50 | | |
51 | | /// Stream ID of the last stream opened. |
52 | | last_opened_id: StreamId, |
53 | | |
54 | | /// What `DATA` frame is currently being sent in the codec. |
55 | | in_flight_data_frame: InFlightData, |
56 | | |
57 | | /// The maximum amount of bytes a stream should buffer. |
58 | | max_buffer_size: usize, |
59 | | } |
60 | | |
61 | | #[derive(Debug, Eq, PartialEq)] |
62 | | enum InFlightData { |
63 | | /// There is no `DATA` frame in flight. |
64 | | Nothing, |
65 | | /// There is a `DATA` frame in flight belonging to the given stream. |
66 | | DataFrame(store::Key), |
67 | | /// There was a `DATA` frame, but the stream's queue was since cleared. |
68 | | Drop, |
69 | | } |
70 | | |
71 | | pub(crate) struct Prioritized<B> { |
72 | | // The buffer |
73 | | inner: Take<B>, |
74 | | |
75 | | end_of_stream: bool, |
76 | | |
77 | | // The stream that this is associated with |
78 | | stream: store::Key, |
79 | | } |
80 | | |
81 | | // ===== impl Prioritize ===== |
82 | | |
83 | | impl Prioritize { |
84 | 13.4k | pub fn new(config: &Config) -> Prioritize { |
85 | 13.4k | let mut flow = FlowControl::new(); |
86 | | |
87 | 13.4k | flow.inc_window(config.remote_init_window_sz) |
88 | 13.4k | .expect("invalid initial window size"); |
89 | | |
90 | | // TODO: proper error handling |
91 | 13.4k | let _res = flow.assign_capacity(config.remote_init_window_sz); |
92 | 13.4k | debug_assert!(_res.is_ok()); |
93 | | |
94 | 13.4k | tracing::trace!("Prioritize::new; flow={:?}", flow); |
95 | | |
96 | 13.4k | Prioritize { |
97 | 13.4k | pending_send: store::Queue::new(), |
98 | 13.4k | pending_capacity: store::Queue::new(), |
99 | 13.4k | pending_open: store::Queue::new(), |
100 | 13.4k | flow, |
101 | 13.4k | last_opened_id: StreamId::ZERO, |
102 | 13.4k | in_flight_data_frame: InFlightData::Nothing, |
103 | 13.4k | max_buffer_size: config.local_max_buffer_size, |
104 | 13.4k | } |
105 | 13.4k | } |
106 | | |
107 | 0 | pub(crate) fn max_buffer_size(&self) -> usize { |
108 | 0 | self.max_buffer_size |
109 | 0 | } |
110 | | |
111 | | /// Queue a frame to be sent to the remote |
112 | 475k | pub fn queue_frame<B>( |
113 | 475k | &mut self, |
114 | 475k | frame: Frame<B>, |
115 | 475k | buffer: &mut Buffer<Frame<B>>, |
116 | 475k | stream: &mut store::Ptr, |
117 | 475k | task: &mut Option<Waker>, |
118 | 475k | ) { |
119 | 475k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); |
120 | 475k | let _e = span.enter(); |
121 | | // Queue the frame in the buffer |
122 | 475k | stream.pending_send.push_back(buffer, frame); |
123 | 475k | self.schedule_send(stream, task); |
124 | 475k | } <h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 589 | pub fn queue_frame<B>( | 113 | 589 | &mut self, | 114 | 589 | frame: Frame<B>, | 115 | 589 | buffer: &mut Buffer<Frame<B>>, | 116 | 589 | stream: &mut store::Ptr, | 117 | 589 | task: &mut Option<Waker>, | 118 | 589 | ) { | 119 | 589 | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 589 | let _e = span.enter(); | 121 | | // Queue the frame in the buffer | 122 | 589 | stream.pending_send.push_back(buffer, frame); | 123 | 589 | self.schedule_send(stream, task); | 124 | 589 | } |
<h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 475k | pub fn queue_frame<B>( | 113 | 475k | &mut self, | 114 | 475k | frame: Frame<B>, | 115 | 475k | buffer: &mut Buffer<Frame<B>>, | 116 | 475k | stream: &mut store::Ptr, | 117 | 475k | task: &mut Option<Waker>, | 118 | 475k | ) { | 119 | 475k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 475k | let _e = span.enter(); | 121 | | // Queue the frame in the buffer | 122 | 475k | stream.pending_send.push_back(buffer, frame); | 123 | 475k | self.schedule_send(stream, task); | 124 | 475k | } |
|
125 | | |
126 | 555k | pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { |
127 | | // If the stream is waiting to be opened, nothing more to do. |
128 | 555k | if stream.is_send_ready() { |
129 | 121k | tracing::trace!(?stream.id, "schedule_send"); |
130 | | // Queue the stream |
131 | 121k | self.pending_send.push(stream); |
132 | | |
133 | | // Notify the connection. |
134 | 121k | if let Some(task) = task.take() { |
135 | 60 | task.wake(); |
136 | 121k | } |
137 | 433k | } |
138 | 555k | } |
139 | | |
140 | 424k | pub fn queue_open(&mut self, stream: &mut store::Ptr) { |
141 | 424k | self.pending_open.push(stream); |
142 | 424k | } |
143 | | |
144 | | /// Send a data frame |
145 | 423k | pub fn send_data<B>( |
146 | 423k | &mut self, |
147 | 423k | frame: frame::Data<B>, |
148 | 423k | buffer: &mut Buffer<Frame<B>>, |
149 | 423k | stream: &mut store::Ptr, |
150 | 423k | counts: &mut Counts, |
151 | 423k | task: &mut Option<Waker>, |
152 | 423k | ) -> Result<(), UserError> |
153 | 423k | where |
154 | 423k | B: Buf, |
155 | | { |
156 | 423k | let sz = frame.payload().remaining(); |
157 | | |
158 | 423k | if sz > MAX_WINDOW_SIZE as usize { |
159 | 0 | return Err(UserError::PayloadTooBig); |
160 | 423k | } |
161 | | |
162 | 423k | let sz = sz as WindowSize; |
163 | | |
164 | 423k | if !stream.state.is_send_streaming() { |
165 | 0 | if stream.state.is_closed() { |
166 | 0 | return Err(InactiveStreamId); |
167 | | } else { |
168 | 0 | return Err(UnexpectedFrameType); |
169 | | } |
170 | 423k | } |
171 | | |
172 | | // Update the buffered data counter |
173 | 423k | stream.buffered_send_data += sz as usize; |
174 | | |
175 | 423k | let span = |
176 | 423k | tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); |
177 | 423k | let _e = span.enter(); |
178 | 423k | tracing::trace!(buffered = stream.buffered_send_data); |
179 | | |
180 | | // Implicitly request more send capacity if not enough has been |
181 | | // requested yet. |
182 | 423k | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { |
183 | | // Update the target requested capacity |
184 | 423k | stream.requested_send_capacity = |
185 | 423k | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; |
186 | | |
187 | | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity |
188 | | // cannot be assigned at the time it is called. |
189 | | // |
190 | | // Streams over the max concurrent count will still call `send_data` so we should be |
191 | | // careful not to put it into `pending_capacity` as it will starve the connection |
192 | | // capacity for other streams |
193 | 423k | if !stream.is_pending_open { |
194 | 0 | self.try_assign_capacity(stream); |
195 | 423k | } |
196 | 0 | } |
197 | | |
198 | 423k | if frame.is_end_stream() { |
199 | 423k | stream.state.send_close(); |
200 | 423k | self.reserve_capacity(0, stream, counts); |
201 | 423k | } |
202 | | |
203 | 423k | tracing::trace!( |
204 | 0 | available = %stream.send_flow.available(), |
205 | 0 | buffered = stream.buffered_send_data, |
206 | | ); |
207 | | |
208 | | // The `stream.buffered_send_data == 0` check is here so that, if a zero |
209 | | // length data frame is queued to the front (there is no previously |
210 | | // queued data), it gets sent out immediately even if there is no |
211 | | // available send window. |
212 | | // |
213 | | // Sending out zero length data frames can be done to signal |
214 | | // end-of-stream. |
215 | | // |
216 | 423k | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { |
217 | 0 | // The stream currently has capacity to send the data frame, so |
218 | 0 | // queue it up and notify the connection task. |
219 | 0 | self.queue_frame(frame.into(), buffer, stream, task); |
220 | 423k | } else { |
221 | 423k | // The stream has no capacity to send the frame now, save it but |
222 | 423k | // don't notify the connection task. Once additional capacity |
223 | 423k | // becomes available, the frame will be flushed. |
224 | 423k | stream.pending_send.push_back(buffer, frame.into()); |
225 | 423k | } |
226 | | |
227 | 423k | Ok(()) |
228 | 423k | } |
229 | | |
230 | | /// Request capacity to send data |
231 | 423k | pub fn reserve_capacity( |
232 | 423k | &mut self, |
233 | 423k | capacity: WindowSize, |
234 | 423k | stream: &mut store::Ptr, |
235 | 423k | counts: &mut Counts, |
236 | 423k | ) { |
237 | 423k | let span = tracing::trace_span!( |
238 | | "reserve_capacity", |
239 | | ?stream.id, |
240 | | requested = capacity, |
241 | 0 | effective = (capacity as usize) + stream.buffered_send_data, |
242 | 0 | curr = stream.requested_send_capacity |
243 | | ); |
244 | 423k | let _e = span.enter(); |
245 | | |
246 | | // Actual capacity is `capacity` + the current amount of buffered data. |
247 | | // If it were less, then we could never send out the buffered data. |
248 | 423k | let capacity = (capacity as usize) + stream.buffered_send_data; |
249 | | |
250 | 423k | match capacity.cmp(&(stream.requested_send_capacity as usize)) { |
251 | 423k | Ordering::Equal => { |
252 | 423k | // Nothing to do |
253 | 423k | } |
254 | | Ordering::Less => { |
255 | | // Update the target requested capacity |
256 | 0 | stream.requested_send_capacity = capacity as WindowSize; |
257 | | |
258 | | // Currently available capacity assigned to the stream |
259 | 0 | let available = stream.send_flow.available().as_size(); |
260 | | |
261 | | // If the stream has more assigned capacity than requested, reclaim |
262 | | // some for the connection |
263 | 0 | if available as usize > capacity { |
264 | 0 | let diff = available - capacity as WindowSize; |
265 | | |
266 | | // TODO: proper error handling |
267 | 0 | let _res = stream.send_flow.claim_capacity(diff); |
268 | 0 | debug_assert!(_res.is_ok()); |
269 | | |
270 | 0 | self.assign_connection_capacity(diff, stream, counts); |
271 | 0 | } |
272 | | } |
273 | | Ordering::Greater => { |
274 | | // If trying to *add* capacity, but the stream send side is closed, |
275 | | // there's nothing to be done. |
276 | 0 | if stream.state.is_send_closed() { |
277 | 0 | return; |
278 | 0 | } |
279 | | |
280 | | // Update the target requested capacity |
281 | 0 | stream.requested_send_capacity = |
282 | 0 | cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; |
283 | | |
284 | | // Try to assign additional capacity to the stream. If none is |
285 | | // currently available, the stream will be queued to receive some |
286 | | // when more becomes available. |
287 | 0 | self.try_assign_capacity(stream); |
288 | | } |
289 | | } |
290 | 423k | } |
291 | | |
292 | 143k | pub fn recv_stream_window_update( |
293 | 143k | &mut self, |
294 | 143k | inc: WindowSize, |
295 | 143k | stream: &mut store::Ptr, |
296 | 143k | ) -> Result<(), Reason> { |
297 | 143k | let span = tracing::trace_span!( |
298 | | "recv_stream_window_update", |
299 | | ?stream.id, |
300 | | ?stream.state, |
301 | | inc, |
302 | 0 | flow = ?stream.send_flow |
303 | | ); |
304 | 143k | let _e = span.enter(); |
305 | | |
306 | 143k | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
307 | | // We can't send any data, so don't bother doing anything else. |
308 | 7.78k | return Ok(()); |
309 | 136k | } |
310 | | |
311 | | // Update the stream level flow control. |
312 | 136k | stream.send_flow.inc_window(inc)?; |
313 | | |
314 | | // If the stream is waiting on additional capacity, then this will |
315 | | // assign it (if available on the connection) and notify the producer |
316 | 136k | self.try_assign_capacity(stream); |
317 | | |
318 | 136k | Ok(()) |
319 | 143k | } |
320 | | |
321 | 667 | pub fn recv_connection_window_update( |
322 | 667 | &mut self, |
323 | 667 | inc: WindowSize, |
324 | 667 | store: &mut Store, |
325 | 667 | counts: &mut Counts, |
326 | 667 | ) -> Result<(), Reason> { |
327 | | // Update the connection's window |
328 | 667 | self.flow.inc_window(inc)?; |
329 | | |
330 | 665 | self.assign_connection_capacity(inc, store, counts); |
331 | 665 | Ok(()) |
332 | 667 | } |
333 | | |
334 | | /// Reclaim all capacity assigned to the stream and re-assign it to the |
335 | | /// connection |
336 | 727k | pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
337 | 727k | let available = stream.send_flow.available().as_size(); |
338 | 727k | if available > 0 { |
339 | | // TODO: proper error handling |
340 | 48.5k | let _res = stream.send_flow.claim_capacity(available); |
341 | 48.5k | debug_assert!(_res.is_ok()); |
342 | | // Re-assign all capacity to the connection |
343 | 48.5k | self.assign_connection_capacity(available, stream, counts); |
344 | 678k | } |
345 | 727k | } |
346 | | |
347 | | /// Reclaim just reserved capacity, not buffered capacity, and re-assign |
348 | | /// it to the connection |
349 | 80.0k | pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
350 | | // only reclaim reserved capacity that isn't already buffered |
351 | 80.0k | if stream.send_flow.available().as_size() as usize > stream.buffered_send_data { |
352 | 0 | let reserved = |
353 | 0 | stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize; |
354 | 0 |
|
355 | 0 | // Panic safety: due to how `reserved` is computed it can't be greater |
356 | 0 | // than what's available. |
357 | 0 | stream |
358 | 0 | .send_flow |
359 | 0 | .claim_capacity(reserved) |
360 | 0 | .expect("window size should be greater than reserved"); |
361 | 0 |
|
362 | 0 | self.assign_connection_capacity(reserved, stream, counts); |
363 | 80.0k | } |
364 | 80.0k | } |
365 | | |
366 | 18.1k | pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { |
367 | 18.1k | let span = tracing::trace_span!("clear_pending_capacity"); |
368 | 18.1k | let _e = span.enter(); |
369 | 30.9k | while let Some(stream) = self.pending_capacity.pop(store) { |
370 | 12.8k | counts.transition(stream, |_, stream| { |
371 | 12.8k | tracing::trace!(?stream.id, "clear_pending_capacity"); |
372 | 12.8k | }) |
373 | | } |
374 | 18.1k | } |
375 | | |
376 | 51.1k | pub fn assign_connection_capacity<R>( |
377 | 51.1k | &mut self, |
378 | 51.1k | inc: WindowSize, |
379 | 51.1k | store: &mut R, |
380 | 51.1k | counts: &mut Counts, |
381 | 51.1k | ) where |
382 | 51.1k | R: Resolve, |
383 | | { |
384 | 51.1k | let span = tracing::trace_span!("assign_connection_capacity", inc); |
385 | 51.1k | let _e = span.enter(); |
386 | | |
387 | | // TODO: proper error handling |
388 | 51.1k | let _res = self.flow.assign_capacity(inc); |
389 | 51.1k | debug_assert!(_res.is_ok()); |
390 | | |
391 | | // Assign newly acquired capacity to streams pending capacity. |
392 | 288k | while self.flow.available() > 0 { |
393 | 258k | let stream = match self.pending_capacity.pop(store) { |
394 | 237k | Some(stream) => stream, |
395 | 20.3k | None => return, |
396 | | }; |
397 | | |
398 | | // Streams pending capacity may have been reset before capacity |
399 | | // became available. In that case, the stream won't want any |
400 | | // capacity, and so we shouldn't "transition" on it, but just evict |
401 | | // it and continue the loop. |
402 | 237k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { |
403 | 134k | continue; |
404 | 103k | } |
405 | | |
406 | 103k | counts.transition(stream, |_, stream| { |
407 | | // Try to assign capacity to the stream. This will also re-queue the |
408 | | // stream if there isn't enough connection level capacity to fulfill |
409 | | // the capacity request. |
410 | 103k | self.try_assign_capacity(stream); |
411 | 103k | }) <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr>::{closure#0}Line | Count | Source | 406 | 31.6k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | 31.6k | self.try_assign_capacity(stream); | 411 | 31.6k | }) |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store>::{closure#0}Line | Count | Source | 406 | 71.4k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | 71.4k | self.try_assign_capacity(stream); | 411 | 71.4k | }) |
|
412 | | } |
413 | 51.1k | } <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr> Line | Count | Source | 376 | 48.5k | pub fn assign_connection_capacity<R>( | 377 | 48.5k | &mut self, | 378 | 48.5k | inc: WindowSize, | 379 | 48.5k | store: &mut R, | 380 | 48.5k | counts: &mut Counts, | 381 | 48.5k | ) where | 382 | 48.5k | R: Resolve, | 383 | | { | 384 | 48.5k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 385 | 48.5k | let _e = span.enter(); | 386 | | | 387 | | // TODO: proper error handling | 388 | 48.5k | let _res = self.flow.assign_capacity(inc); | 389 | 48.5k | debug_assert!(_res.is_ok()); | 390 | | | 391 | | // Assign newly acquired capacity to streams pending capacity. | 392 | 214k | while self.flow.available() > 0 { | 393 | 184k | let stream = match self.pending_capacity.pop(store) { | 394 | 165k | Some(stream) => stream, | 395 | 18.5k | None => return, | 396 | | }; | 397 | | | 398 | | // Streams pending capacity may have been reset before capacity | 399 | | // became available. In that case, the stream won't want any | 400 | | // capacity, and so we shouldn't "transition" on it, but just evict | 401 | | // it and continue the loop. | 402 | 165k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 403 | 134k | continue; | 404 | 31.6k | } | 405 | | | 406 | 31.6k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | | self.try_assign_capacity(stream); | 411 | | }) | 412 | | } | 413 | 48.5k | } |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store> Line | Count | Source | 376 | 2.60k | pub fn assign_connection_capacity<R>( | 377 | 2.60k | &mut self, | 378 | 2.60k | inc: WindowSize, | 379 | 2.60k | store: &mut R, | 380 | 2.60k | counts: &mut Counts, | 381 | 2.60k | ) where | 382 | 2.60k | R: Resolve, | 383 | | { | 384 | 2.60k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 385 | 2.60k | let _e = span.enter(); | 386 | | | 387 | | // TODO: proper error handling | 388 | 2.60k | let _res = self.flow.assign_capacity(inc); | 389 | 2.60k | debug_assert!(_res.is_ok()); | 390 | | | 391 | | // Assign newly acquired capacity to streams pending capacity. | 392 | 74.5k | while self.flow.available() > 0 { | 393 | 73.7k | let stream = match self.pending_capacity.pop(store) { | 394 | 71.9k | Some(stream) => stream, | 395 | 1.80k | None => return, | 396 | | }; | 397 | | | 398 | | // Streams pending capacity may have been reset before capacity | 399 | | // became available. In that case, the stream won't want any | 400 | | // capacity, and so we shouldn't "transition" on it, but just evict | 401 | | // it and continue the loop. | 402 | 71.9k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 403 | 474 | continue; | 404 | 71.4k | } | 405 | | | 406 | 71.4k | counts.transition(stream, |_, stream| { | 407 | | // Try to assign capacity to the stream. This will also re-queue the | 408 | | // stream if there isn't enough connection level capacity to fulfill | 409 | | // the capacity request. | 410 | | self.try_assign_capacity(stream); | 411 | | }) | 412 | | } | 413 | 2.60k | } |
|
414 | | |
415 | | /// Request capacity to send data |
416 | 421k | fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { |
417 | 421k | let total_requested = stream.requested_send_capacity; |
418 | | |
419 | | // Total requested should never go below actual assigned |
420 | | // (Note: the window size can go lower than assigned) |
421 | 421k | debug_assert!(stream.send_flow.available() <= total_requested as usize); |
422 | | |
423 | | // The amount of additional capacity that the stream requests. |
424 | | // Don't assign more than the window has available! |
425 | 421k | let additional = cmp::min( |
426 | 421k | total_requested - stream.send_flow.available().as_size(), |
427 | | // Can't assign more than what is available |
428 | 421k | stream.send_flow.window_size() - stream.send_flow.available().as_size(), |
429 | | ); |
430 | 421k | let span = tracing::trace_span!("try_assign_capacity", ?stream.id); |
431 | 421k | let _e = span.enter(); |
432 | 421k | tracing::trace!( |
433 | | requested = total_requested, |
434 | | additional, |
435 | 0 | buffered = stream.buffered_send_data, |
436 | 0 | window = stream.send_flow.window_size(), |
437 | 0 | conn = %self.flow.available() |
438 | | ); |
439 | | |
440 | 421k | if additional == 0 { |
441 | | // Nothing more to do |
442 | 52.5k | return; |
443 | 368k | } |
444 | | |
445 | | // If the stream has requested capacity, then it must be in the |
446 | | // streaming state (more data could be sent) or there is buffered data |
447 | | // waiting to be sent. |
448 | 368k | debug_assert!( |
449 | 0 | stream.state.is_send_streaming() || stream.buffered_send_data > 0, |
450 | 0 | "state={:?}", |
451 | 0 | stream.state |
452 | | ); |
453 | | |
454 | | // The amount of currently available capacity on the connection |
455 | 368k | let conn_available = self.flow.available().as_size(); |
456 | | |
457 | | // First check if capacity is immediately available |
458 | 368k | if conn_available > 0 { |
459 | | // The amount of capacity to assign to the stream |
460 | | // TODO: Should prioritization factor into this? |
461 | 88.7k | let assign = cmp::min(conn_available, additional); |
462 | | |
463 | 88.7k | tracing::trace!(capacity = assign, "assigning"); |
464 | | |
465 | | // Assign the capacity to the stream |
466 | 88.7k | stream.assign_capacity(assign, self.max_buffer_size); |
467 | | |
468 | | // Claim the capacity from the connection |
469 | | // TODO: proper error handling |
470 | 88.7k | let _res = self.flow.claim_capacity(assign); |
471 | 88.7k | debug_assert!(_res.is_ok()); |
472 | 279k | } |
473 | | |
474 | 368k | tracing::trace!( |
475 | 0 | available = %stream.send_flow.available(), |
476 | 0 | requested = stream.requested_send_capacity, |
477 | 0 | buffered = stream.buffered_send_data, |
478 | 0 | has_unavailable = %stream.send_flow.has_unavailable() |
479 | | ); |
480 | | |
481 | 368k | if stream.send_flow.available() < stream.requested_send_capacity as usize |
482 | 348k | && stream.send_flow.has_unavailable() |
483 | 302k | { |
484 | 302k | // The stream requires additional capacity and the stream's |
485 | 302k | // window has available capacity, but the connection window |
486 | 302k | // does not. |
487 | 302k | // |
488 | 302k | // In this case, the stream needs to be queued up for when the |
489 | 302k | // connection has more capacity. |
490 | 302k | self.pending_capacity.push(stream); |
491 | 302k | } |
492 | | |
493 | | // If data is buffered and the stream is send ready, then |
494 | | // schedule the stream for execution |
495 | 368k | if stream.buffered_send_data > 0 && stream.is_send_ready() { |
496 | 279k | // TODO: This assertion isn't *exactly* correct. There can still be |
497 | 279k | // buffered send data while the stream's pending send queue is |
498 | 279k | // empty. This can happen when a large data frame is in the process |
499 | 279k | // of being **partially** sent. Once the window has been sent, the |
500 | 279k | // data frame will be returned to the prioritization layer to be |
501 | 279k | // re-scheduled. |
502 | 279k | // |
503 | 279k | // That said, it would be nice to figure out how to make this |
504 | 279k | // assertion correctly. |
505 | 279k | // |
506 | 279k | // debug_assert!(!stream.pending_send.is_empty()); |
507 | 279k | |
508 | 279k | self.pending_send.push(stream); |
509 | 279k | } |
510 | 421k | } |
511 | | |
512 | 140k | pub fn poll_complete<T, B>( |
513 | 140k | &mut self, |
514 | 140k | cx: &mut Context, |
515 | 140k | buffer: &mut Buffer<Frame<B>>, |
516 | 140k | store: &mut Store, |
517 | 140k | counts: &mut Counts, |
518 | 140k | dst: &mut Codec<T, Prioritized<B>>, |
519 | 140k | ) -> Poll<io::Result<()>> |
520 | 140k | where |
521 | 140k | T: AsyncWrite + Unpin, |
522 | 140k | B: Buf, |
523 | | { |
524 | | // Ensure codec is ready |
525 | 140k | ready!(dst.poll_ready(cx))?; |
526 | | |
527 | | // Reclaim any frame that has previously been written |
528 | 140k | self.reclaim_frame(buffer, store, dst); |
529 | | |
530 | | // The max frame length |
531 | 140k | let max_frame_len = dst.max_send_frame_size(); |
532 | | |
533 | 140k | tracing::trace!("poll_complete"); |
534 | | |
535 | | loop { |
536 | 341k | if let Some(mut stream) = self.pop_pending_open(store, counts) { |
537 | 181k | self.pending_send.push_front(&mut stream); |
538 | 181k | self.try_assign_capacity(&mut stream); |
539 | 181k | } |
540 | | |
541 | 341k | match self.pop_frame(buffer, store, max_frame_len, counts) { |
542 | 202k | Some(frame) => { |
543 | 202k | tracing::trace!(?frame, "writing"); |
544 | | |
545 | 202k | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); |
546 | 202k | if let Frame::Data(ref frame) = frame { |
547 | 19.4k | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); |
548 | 183k | } |
549 | 202k | dst.buffer(frame).expect("invalid frame"); |
550 | | |
551 | | // Ensure the codec is ready to try the loop again. |
552 | 202k | ready!(dst.poll_ready(cx))?; |
553 | | |
554 | | // Because, always try to reclaim... |
555 | 200k | self.reclaim_frame(buffer, store, dst); |
556 | | } |
557 | | None => { |
558 | | // Try to flush the codec. |
559 | 138k | ready!(dst.flush(cx))?; |
560 | | |
561 | | // This might release a data frame... |
562 | 135k | if !self.reclaim_frame(buffer, store, dst) { |
563 | 135k | return Poll::Ready(Ok(())); |
564 | 0 | } |
565 | | |
566 | | // No need to poll ready as poll_complete() does this for |
567 | | // us... |
568 | | } |
569 | | } |
570 | | } |
571 | 140k | } |
572 | | |
573 | | /// Tries to reclaim a pending data frame from the codec. |
574 | | /// |
575 | | /// Returns true if a frame was reclaimed. |
576 | | /// |
577 | | /// When a data frame is written to the codec, it may not be written in its |
578 | | /// entirety (large chunks are split up into potentially many data frames). |
579 | | /// In this case, the stream needs to be reprioritized. |
580 | 476k | fn reclaim_frame<T, B>( |
581 | 476k | &mut self, |
582 | 476k | buffer: &mut Buffer<Frame<B>>, |
583 | 476k | store: &mut Store, |
584 | 476k | dst: &mut Codec<T, Prioritized<B>>, |
585 | 476k | ) -> bool |
586 | 476k | where |
587 | 476k | B: Buf, |
588 | | { |
589 | 476k | let span = tracing::trace_span!("try_reclaim_frame"); |
590 | 476k | let _e = span.enter(); |
591 | | |
592 | | // First check if there are any data chunks to take back |
593 | 476k | if let Some(frame) = dst.take_last_data_frame() { |
594 | 18.0k | self.reclaim_frame_inner(buffer, store, frame) |
595 | | } else { |
596 | 458k | false |
597 | | } |
598 | 476k | } |
599 | | |
600 | 18.0k | fn reclaim_frame_inner<B>( |
601 | 18.0k | &mut self, |
602 | 18.0k | buffer: &mut Buffer<Frame<B>>, |
603 | 18.0k | store: &mut Store, |
604 | 18.0k | frame: frame::Data<Prioritized<B>>, |
605 | 18.0k | ) -> bool |
606 | 18.0k | where |
607 | 18.0k | B: Buf, |
608 | | { |
609 | 18.0k | tracing::trace!( |
610 | | ?frame, |
611 | 0 | sz = frame.payload().inner.get_ref().remaining(), |
612 | 0 | "reclaimed" |
613 | | ); |
614 | | |
615 | 18.0k | let mut eos = false; |
616 | 18.0k | let key = frame.payload().stream; |
617 | | |
618 | 18.0k | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { |
619 | 0 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), |
620 | | InFlightData::Drop => { |
621 | 35 | tracing::trace!("not reclaiming frame for cancelled stream"); |
622 | 35 | return false; |
623 | | } |
624 | 18.0k | InFlightData::DataFrame(k) => { |
625 | 18.0k | debug_assert_eq!(k, key); |
626 | | } |
627 | | } |
628 | | |
629 | 18.0k | let mut frame = frame.map(|prioritized| { |
630 | | // TODO: Ensure fully written |
631 | 18.0k | eos = prioritized.end_of_stream; |
632 | 18.0k | prioritized.inner.into_inner() |
633 | 18.0k | }); |
634 | | |
635 | 18.0k | if frame.payload().has_remaining() { |
636 | 17.4k | let mut stream = store.resolve(key); |
637 | | |
638 | 17.4k | if eos { |
639 | 17.4k | frame.set_end_stream(true); |
640 | 17.4k | } |
641 | | |
642 | 17.4k | self.push_back_frame(frame.into(), buffer, &mut stream); |
643 | | |
644 | 17.4k | return true; |
645 | 563 | } |
646 | | |
647 | 563 | false |
648 | 18.0k | } |
649 | | |
650 | | /// Push the frame to the front of the stream's deque, scheduling the |
651 | | /// stream if needed. |
652 | 17.4k | fn push_back_frame<B>( |
653 | 17.4k | &mut self, |
654 | 17.4k | frame: Frame<B>, |
655 | 17.4k | buffer: &mut Buffer<Frame<B>>, |
656 | 17.4k | stream: &mut store::Ptr, |
657 | 17.4k | ) { |
658 | | // Push the frame to the front of the stream's deque |
659 | 17.4k | stream.pending_send.push_front(buffer, frame); |
660 | | |
661 | | // If needed, schedule the sender |
662 | 17.4k | if stream.send_flow.available() > 0 { |
663 | 1.69k | debug_assert!(!stream.pending_send.is_empty()); |
664 | 1.69k | self.pending_send.push(stream); |
665 | 15.7k | } |
666 | 17.4k | } |
667 | | |
668 | 727k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { |
669 | 727k | let span = tracing::trace_span!("clear_queue", ?stream.id); |
670 | 727k | let _e = span.enter(); |
671 | | |
672 | | // TODO: make this more efficient? |
673 | 1.44M | while let Some(frame) = stream.pending_send.pop_front(buffer) { |
674 | 714k | tracing::trace!(?frame, "dropping"); |
675 | | } |
676 | | |
677 | 727k | stream.buffered_send_data = 0; |
678 | 727k | stream.requested_send_capacity = 0; |
679 | 727k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { |
680 | 81.8k | if stream.key() == key { |
681 | 1.28k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. |
682 | 1.28k | self.in_flight_data_frame = InFlightData::Drop; |
683 | 80.6k | } |
684 | 645k | } |
685 | 727k | } <h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 589 | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 589 | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 589 | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 1.17k | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 589 | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 589 | stream.buffered_send_data = 0; | 678 | 589 | stream.requested_send_capacity = 0; | 679 | 589 | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 0 | if stream.key() == key { | 681 | 0 | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 0 | self.in_flight_data_frame = InFlightData::Drop; | 683 | 0 | } | 684 | 589 | } | 685 | 589 | } |
<h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 726k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 726k | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 726k | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 1.44M | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 713k | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 726k | stream.buffered_send_data = 0; | 678 | 726k | stream.requested_send_capacity = 0; | 679 | 726k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 81.8k | if stream.key() == key { | 681 | 1.28k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 1.28k | self.in_flight_data_frame = InFlightData::Drop; | 683 | 80.6k | } | 684 | 644k | } | 685 | 726k | } |
|
686 | | |
687 | 18.1k | pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { |
688 | 209k | while let Some(mut stream) = self.pending_send.pop(store) { |
689 | 191k | let is_pending_reset = stream.is_pending_reset_expiration(); |
690 | 191k | if let Some(reason) = stream.state.get_scheduled_reset() { |
691 | 70.4k | stream.set_reset(reason, Initiator::Library); |
692 | 120k | } |
693 | 191k | counts.transition_after(stream, is_pending_reset); |
694 | | } |
695 | 18.1k | } |
696 | | |
697 | 18.1k | pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { |
698 | 260k | while let Some(stream) = self.pending_open.pop(store) { |
699 | 242k | let is_pending_reset = stream.is_pending_reset_expiration(); |
700 | 242k | counts.transition_after(stream, is_pending_reset); |
701 | 242k | } |
702 | 18.1k | } |
703 | | |
704 | 341k | fn pop_frame<B>( |
705 | 341k | &mut self, |
706 | 341k | buffer: &mut Buffer<Frame<B>>, |
707 | 341k | store: &mut Store, |
708 | 341k | max_len: usize, |
709 | 341k | counts: &mut Counts, |
710 | 341k | ) -> Option<Frame<Prioritized<B>>> |
711 | 341k | where |
712 | 341k | B: Buf, |
713 | | { |
714 | 341k | let span = tracing::trace_span!("pop_frame"); |
715 | 341k | let _e = span.enter(); |
716 | | |
717 | | loop { |
718 | 381k | match self.pending_send.pop(store) { |
719 | 242k | Some(mut stream) => { |
720 | 242k | let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); |
721 | 242k | let _e = span.enter(); |
722 | | |
723 | | // It's possible that this stream, besides having data to send, |
724 | | // is also queued to send a reset, and thus is already in the queue |
725 | | // to wait for "some time" after a reset. |
726 | | // |
727 | | // To be safe, we just always ask the stream. |
728 | 242k | let is_pending_reset = stream.is_pending_reset_expiration(); |
729 | | |
730 | 242k | tracing::trace!(is_pending_reset); |
731 | | |
732 | 242k | let frame = match stream.pending_send.pop_front(buffer) { |
733 | 42.5k | Some(Frame::Data(mut frame)) => { |
734 | | // Get the amount of capacity remaining for stream's |
735 | | // window. |
736 | 42.5k | let stream_capacity = stream.send_flow.available(); |
737 | 42.5k | let sz = frame.payload().remaining(); |
738 | | |
739 | 42.5k | tracing::trace!( |
740 | | sz, |
741 | 0 | eos = frame.is_end_stream(), |
742 | | window = %stream_capacity, |
743 | 0 | available = %stream.send_flow.available(), |
744 | 0 | requested = stream.requested_send_capacity, |
745 | 0 | buffered = stream.buffered_send_data, |
746 | 0 | "data frame" |
747 | | ); |
748 | | |
749 | | // Zero length data frames always have capacity to |
750 | | // be sent. |
751 | 42.5k | if sz > 0 && stream_capacity == 0 { |
752 | 23.1k | tracing::trace!("stream capacity is 0"); |
753 | | |
754 | | // Ensure that the stream is waiting for |
755 | | // connection level capacity |
756 | | // |
757 | | // TODO: uncomment |
758 | | // debug_assert!(stream.is_pending_send_capacity); |
759 | | |
760 | | // The stream has no more capacity, this can |
761 | | // happen if the remote reduced the stream |
762 | | // window. In this case, we need to buffer the |
763 | | // frame and wait for a window update... |
764 | 23.1k | stream.pending_send.push_front(buffer, frame.into()); |
765 | | |
766 | 23.1k | continue; |
767 | 19.4k | } |
768 | | |
769 | | // Only send up to the max frame length |
770 | 19.4k | let len = cmp::min(sz, max_len); |
771 | | |
772 | | // Only send up to the stream's window capacity |
773 | 19.4k | let len = |
774 | 19.4k | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; |
775 | | |
776 | | // There *must* be be enough connection level |
777 | | // capacity at this point. |
778 | 19.4k | debug_assert!(len <= self.flow.window_size()); |
779 | | |
780 | | // Check if the stream level window the peer knows is available. In some |
781 | | // scenarios, maybe the window we know is available but the window which |
782 | | // peer knows is not. |
783 | 19.4k | if len > 0 && len > stream.send_flow.window_size() { |
784 | 0 | stream.pending_send.push_front(buffer, frame.into()); |
785 | 0 | continue; |
786 | 19.4k | } |
787 | | |
788 | 19.4k | tracing::trace!(len, "sending data frame"); |
789 | | |
790 | | // Update the flow control |
791 | 19.4k | tracing::trace_span!("updating stream flow").in_scope(|| { |
792 | 19.4k | stream.send_data(len, self.max_buffer_size); |
793 | | |
794 | | // Assign the capacity back to the connection that |
795 | | // was just consumed from the stream in the previous |
796 | | // line. |
797 | | // TODO: proper error handling |
798 | 19.4k | let _res = self.flow.assign_capacity(len); |
799 | 19.4k | debug_assert!(_res.is_ok()); |
800 | 19.4k | }); |
801 | | |
802 | 19.4k | let (eos, len) = tracing::trace_span!("updating connection flow") |
803 | 19.4k | .in_scope(|| { |
804 | | // TODO: proper error handling |
805 | 19.4k | let _res = self.flow.send_data(len); |
806 | 19.4k | debug_assert!(_res.is_ok()); |
807 | | |
808 | | // Wrap the frame's data payload to ensure that the |
809 | | // correct amount of data gets written. |
810 | | |
811 | 19.4k | let eos = frame.is_end_stream(); |
812 | 19.4k | let len = len as usize; |
813 | | |
814 | 19.4k | if frame.payload().remaining() > len { |
815 | 18.7k | frame.set_end_stream(false); |
816 | 18.7k | } |
817 | 19.4k | (eos, len) |
818 | 19.4k | }); |
819 | | |
820 | 19.4k | Frame::Data(frame.map(|buf| Prioritized { |
821 | 19.4k | inner: buf.take(len), |
822 | 19.4k | end_of_stream: eos, |
823 | 19.4k | stream: stream.key(), |
824 | 19.4k | })) |
825 | | } |
826 | 0 | Some(Frame::PushPromise(pp)) => { |
827 | 0 | let mut pushed = |
828 | 0 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); |
829 | 0 | pushed.is_pending_push = false; |
830 | | // Transition stream from pending_push to pending_open |
831 | | // if possible |
832 | 0 | if !pushed.pending_send.is_empty() { |
833 | 0 | if counts.can_inc_num_send_streams() { |
834 | 0 | counts.inc_num_send_streams(&mut pushed); |
835 | 0 | self.pending_send.push(&mut pushed); |
836 | 0 | } else { |
837 | 0 | self.queue_open(&mut pushed); |
838 | 0 | } |
839 | 0 | } |
840 | 0 | Frame::PushPromise(pp) |
841 | | } |
842 | 183k | Some(frame) => frame.map(|_| { |
843 | 0 | unreachable!( |
844 | | "Frame::map closure will only be called \ |
845 | | on DATA frames." |
846 | | ) |
847 | | }), |
848 | | None => { |
849 | 16.7k | if let Some(reason) = stream.state.get_scheduled_reset() { |
850 | 68 | stream.set_reset(reason, Initiator::Library); |
851 | | |
852 | 68 | let frame = frame::Reset::new(stream.id, reason); |
853 | 68 | Frame::Reset(frame) |
854 | | } else { |
855 | | // If the stream receives a RESET from the peer, it may have |
856 | | // had data buffered to be sent, but all the frames are cleared |
857 | | // in clear_queue(). Instead of doing O(N) traversal through queue |
858 | | // to remove, lets just ignore the stream here. |
859 | 16.6k | tracing::trace!("removing dangling stream from pending_send"); |
860 | | // Since this should only happen as a consequence of `clear_queue`, |
861 | | // we must be in a closed state of some kind. |
862 | 16.6k | debug_assert!(stream.state.is_closed()); |
863 | 16.6k | counts.transition_after(stream, is_pending_reset); |
864 | 16.6k | continue; |
865 | | } |
866 | | } |
867 | | }; |
868 | | |
869 | 202k | tracing::trace!("pop_frame; frame={:?}", frame); |
870 | | |
871 | 202k | if cfg!(debug_assertions) && stream.state.is_idle() { |
872 | 0 | debug_assert!(stream.id > self.last_opened_id); |
873 | 0 | self.last_opened_id = stream.id; |
874 | 202k | } |
875 | | |
876 | 202k | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { |
877 | 181k | // TODO: Only requeue the sender IF it is ready to send |
878 | 181k | // the next frame. i.e. don't requeue it if the next |
879 | 181k | // frame is a data frame and the stream does not have |
880 | 181k | // any more capacity. |
881 | 181k | self.pending_send.push(&mut stream); |
882 | 181k | } |
883 | | |
884 | 202k | counts.transition_after(stream, is_pending_reset); |
885 | | |
886 | 202k | return Some(frame); |
887 | | } |
888 | 138k | None => return None, |
889 | | } |
890 | | } |
891 | 341k | } |
892 | | |
893 | 341k | fn pop_pending_open<'s>( |
894 | 341k | &mut self, |
895 | 341k | store: &'s mut Store, |
896 | 341k | counts: &mut Counts, |
897 | 341k | ) -> Option<store::Ptr<'s>> { |
898 | 341k | tracing::trace!("schedule_pending_open"); |
899 | | // check for any pending open streams |
900 | 341k | if counts.can_inc_num_send_streams() { |
901 | 339k | if let Some(mut stream) = self.pending_open.pop(store) { |
902 | 181k | tracing::trace!("schedule_pending_open; stream={:?}", stream.id); |
903 | | |
904 | 181k | counts.inc_num_send_streams(&mut stream); |
905 | 181k | stream.notify_send(); |
906 | 181k | return Some(stream); |
907 | 157k | } |
908 | 2.42k | } |
909 | | |
910 | 159k | None |
911 | 341k | } |
912 | | } |
913 | | |
914 | | // ===== impl Prioritized ===== |
915 | | |
916 | | impl<B> Buf for Prioritized<B> |
917 | | where |
918 | | B: Buf, |
919 | | { |
920 | 698k | fn remaining(&self) -> usize { |
921 | 698k | self.inner.remaining() |
922 | 698k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining Line | Count | Source | 920 | 698k | fn remaining(&self) -> usize { | 921 | 698k | self.inner.remaining() | 922 | 698k | } |
|
923 | | |
924 | 278k | fn chunk(&self) -> &[u8] { |
925 | 278k | self.inner.chunk() |
926 | 278k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk Line | Count | Source | 924 | 278k | fn chunk(&self) -> &[u8] { | 925 | 278k | self.inner.chunk() | 926 | 278k | } |
|
927 | | |
928 | 0 | fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { |
929 | 0 | self.inner.chunks_vectored(dst) |
930 | 0 | } |
931 | | |
932 | 21.3k | fn advance(&mut self, cnt: usize) { |
933 | 21.3k | self.inner.advance(cnt) |
934 | 21.3k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance Line | Count | Source | 932 | 21.3k | fn advance(&mut self, cnt: usize) { | 933 | 21.3k | self.inner.advance(cnt) | 934 | 21.3k | } |
|
935 | | } |
936 | | |
937 | | impl<B: Buf> fmt::Debug for Prioritized<B> { |
938 | | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
939 | | fmt.debug_struct("Prioritized") |
940 | | .field("remaining", &self.inner.get_ref().remaining()) |
941 | | .field("end_of_stream", &self.end_of_stream) |
942 | | .field("stream", &self.stream) |
943 | | .finish() |
944 | | } |
945 | | } |