/src/h2/src/proto/streams/prioritize.rs
Line | Count | Source |
1 | | use super::store::Resolve; |
2 | | use super::*; |
3 | | |
4 | | use crate::frame::Reason; |
5 | | |
6 | | use crate::codec::UserError; |
7 | | use crate::codec::UserError::*; |
8 | | |
9 | | use bytes::buf::Take; |
10 | | use std::{ |
11 | | cmp::{self, Ordering}, |
12 | | fmt, io, mem, |
13 | | task::{Context, Poll, Waker}, |
14 | | }; |
15 | | |
16 | | /// # Warning |
17 | | /// |
18 | | /// Queued streams are ordered by stream ID, as we need to ensure that |
19 | | /// lower-numbered streams are sent headers before higher-numbered ones. |
20 | | /// This is because "idle" stream IDs – those which have been initiated but |
21 | | /// have yet to receive frames – will be implicitly closed on receipt of a |
22 | | /// frame on a higher stream ID. If these queues was not ordered by stream |
23 | | /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] |
24 | | /// idle stream is opened first. |
25 | | #[derive(Debug)] |
26 | | pub(super) struct Prioritize { |
27 | | /// Queue of streams waiting for socket capacity to send a frame. |
28 | | pending_send: store::Queue<stream::NextSend>, |
29 | | |
30 | | /// Queue of streams waiting for window capacity to produce data. |
31 | | pending_capacity: store::Queue<stream::NextSendCapacity>, |
32 | | |
33 | | /// Streams waiting for capacity due to max concurrency |
34 | | /// |
35 | | /// The `SendRequest` handle is `Clone`. This enables initiating requests |
36 | | /// from many tasks. However, offering this capability while supporting |
37 | | /// backpressure at some level is tricky. If there are many `SendRequest` |
38 | | /// handles and a single stream becomes available, which handle gets |
39 | | /// assigned that stream? Maybe that handle is no longer ready to send a |
40 | | /// request. |
41 | | /// |
42 | | /// The strategy used is to allow each `SendRequest` handle one buffered |
43 | | /// request. A `SendRequest` handle is ready to send a request if it has no |
44 | | /// associated buffered requests. This is the same strategy as `mpsc` in the |
45 | | /// futures library. |
46 | | pending_open: store::Queue<stream::NextOpen>, |
47 | | |
48 | | /// Connection level flow control governing sent data |
49 | | flow: FlowControl, |
50 | | |
51 | | /// Stream ID of the last stream opened. |
52 | | last_opened_id: StreamId, |
53 | | |
54 | | /// What `DATA` frame is currently being sent in the codec. |
55 | | in_flight_data_frame: InFlightData, |
56 | | |
57 | | /// The maximum amount of bytes a stream should buffer. |
58 | | max_buffer_size: usize, |
59 | | } |
60 | | |
61 | | #[derive(Debug, Eq, PartialEq)] |
62 | | enum InFlightData { |
63 | | /// There is no `DATA` frame in flight. |
64 | | Nothing, |
65 | | /// There is a `DATA` frame in flight belonging to the given stream. |
66 | | DataFrame(store::Key), |
67 | | /// There was a `DATA` frame, but the stream's queue was since cleared. |
68 | | Drop, |
69 | | } |
70 | | |
71 | | pub(crate) struct Prioritized<B> { |
72 | | // The buffer |
73 | | inner: Take<B>, |
74 | | |
75 | | end_of_stream: bool, |
76 | | |
77 | | // The stream that this is associated with |
78 | | stream: store::Key, |
79 | | } |
80 | | |
81 | | // ===== impl Prioritize ===== |
82 | | |
83 | | impl Prioritize { |
84 | 14.5k | pub fn new(config: &Config) -> Prioritize { |
85 | 14.5k | let mut flow = FlowControl::new(); |
86 | | |
87 | 14.5k | flow.inc_window(config.remote_init_window_sz) |
88 | 14.5k | .expect("invalid initial window size"); |
89 | | |
90 | | // TODO: proper error handling |
91 | 14.5k | let _res = flow.assign_capacity(config.remote_init_window_sz); |
92 | 14.5k | debug_assert!(_res.is_ok()); |
93 | | |
94 | 14.5k | tracing::trace!("Prioritize::new; flow={:?}", flow); |
95 | | |
96 | 14.5k | Prioritize { |
97 | 14.5k | pending_send: store::Queue::new(), |
98 | 14.5k | pending_capacity: store::Queue::new(), |
99 | 14.5k | pending_open: store::Queue::new(), |
100 | 14.5k | flow, |
101 | 14.5k | last_opened_id: StreamId::ZERO, |
102 | 14.5k | in_flight_data_frame: InFlightData::Nothing, |
103 | 14.5k | max_buffer_size: config.local_max_buffer_size, |
104 | 14.5k | } |
105 | 14.5k | } |
106 | | |
107 | 0 | pub(crate) fn max_buffer_size(&self) -> usize { |
108 | 0 | self.max_buffer_size |
109 | 0 | } |
110 | | |
111 | | /// Queue a frame to be sent to the remote |
112 | 517k | pub fn queue_frame<B>( |
113 | 517k | &mut self, |
114 | 517k | frame: Frame<B>, |
115 | 517k | buffer: &mut Buffer<Frame<B>>, |
116 | 517k | stream: &mut store::Ptr, |
117 | 517k | task: &mut Option<Waker>, |
118 | 517k | ) { |
119 | 517k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); |
120 | 517k | let _e = span.enter(); |
121 | | // Queue the frame in the buffer |
122 | 517k | stream.pending_send.push_back(buffer, frame); |
123 | 517k | self.schedule_send(stream, task); |
124 | 517k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::queue_frame::<_> <h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 708 | pub fn queue_frame<B>( | 113 | 708 | &mut self, | 114 | 708 | frame: Frame<B>, | 115 | 708 | buffer: &mut Buffer<Frame<B>>, | 116 | 708 | stream: &mut store::Ptr, | 117 | 708 | task: &mut Option<Waker>, | 118 | 708 | ) { | 119 | 708 | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 708 | let _e = span.enter(); | 121 | | // Queue the frame in the buffer | 122 | 708 | stream.pending_send.push_back(buffer, frame); | 123 | 708 | self.schedule_send(stream, task); | 124 | 708 | } |
<h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes> Line | Count | Source | 112 | 516k | pub fn queue_frame<B>( | 113 | 516k | &mut self, | 114 | 516k | frame: Frame<B>, | 115 | 516k | buffer: &mut Buffer<Frame<B>>, | 116 | 516k | stream: &mut store::Ptr, | 117 | 516k | task: &mut Option<Waker>, | 118 | 516k | ) { | 119 | 516k | let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); | 120 | 516k | let _e = span.enter(); | 121 | | // Queue the frame in the buffer | 122 | 516k | stream.pending_send.push_back(buffer, frame); | 123 | 516k | self.schedule_send(stream, task); | 124 | 516k | } |
|
125 | | |
126 | 604k | pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { |
127 | | // If the stream is waiting to be opened, nothing more to do. |
128 | 604k | if stream.is_send_ready() { |
129 | 106k | tracing::trace!(?stream.id, "schedule_send"); |
130 | | // Queue the stream |
131 | 106k | self.pending_send.push(stream); |
132 | | |
133 | | // Notify the connection. |
134 | 106k | if let Some(task) = task.take() { |
135 | 91 | task.wake(); |
136 | 105k | } |
137 | 498k | } |
138 | 604k | } |
139 | | |
140 | 488k | pub fn queue_open(&mut self, stream: &mut store::Ptr) { |
141 | 488k | self.pending_open.push(stream); |
142 | 488k | } |
143 | | |
144 | | /// Send a data frame |
145 | 487k | pub fn send_data<B>( |
146 | 487k | &mut self, |
147 | 487k | frame: frame::Data<B>, |
148 | 487k | buffer: &mut Buffer<Frame<B>>, |
149 | 487k | stream: &mut store::Ptr, |
150 | 487k | counts: &mut Counts, |
151 | 487k | task: &mut Option<Waker>, |
152 | 487k | ) -> Result<(), UserError> |
153 | 487k | where |
154 | 487k | B: Buf, |
155 | | { |
156 | 487k | let sz = frame.payload().remaining(); |
157 | | |
158 | 487k | if sz > MAX_WINDOW_SIZE as usize { |
159 | 0 | return Err(UserError::PayloadTooBig); |
160 | 487k | } |
161 | | |
162 | 487k | let sz = sz as WindowSize; |
163 | | |
164 | 487k | if !stream.state.is_send_streaming() { |
165 | 0 | if stream.state.is_closed() { |
166 | 0 | return Err(InactiveStreamId); |
167 | | } else { |
168 | 0 | return Err(UnexpectedFrameType); |
169 | | } |
170 | 487k | } |
171 | | |
172 | | // Update the buffered data counter |
173 | 487k | stream.buffered_send_data += sz as usize; |
174 | | |
175 | 487k | let span = |
176 | 487k | tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); |
177 | 487k | let _e = span.enter(); |
178 | 487k | tracing::trace!(buffered = stream.buffered_send_data); |
179 | | |
180 | | // Implicitly request more send capacity if not enough has been |
181 | | // requested yet. |
182 | 487k | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { |
183 | 487k | // Update the target requested capacity |
184 | 487k | stream.requested_send_capacity = |
185 | 487k | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; |
186 | 487k | |
187 | 487k | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity |
188 | 487k | // cannot be assigned at the time it is called. |
189 | 487k | self.try_assign_capacity(stream); |
190 | 487k | } |
191 | | |
192 | 487k | if frame.is_end_stream() { |
193 | 487k | stream.state.send_close(); |
194 | 487k | self.reserve_capacity(0, stream, counts); |
195 | 487k | } |
196 | | |
197 | 487k | tracing::trace!( |
198 | 0 | available = %stream.send_flow.available(), |
199 | 0 | buffered = stream.buffered_send_data, |
200 | | ); |
201 | | |
202 | | // The `stream.buffered_send_data == 0` check is here so that, if a zero |
203 | | // length data frame is queued to the front (there is no previously |
204 | | // queued data), it gets sent out immediately even if there is no |
205 | | // available send window. |
206 | | // |
207 | | // Sending out zero length data frames can be done to signal |
208 | | // end-of-stream. |
209 | | // |
210 | 487k | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { |
211 | 0 | // The stream currently has capacity to send the data frame, so |
212 | 0 | // queue it up and notify the connection task. |
213 | 0 | self.queue_frame(frame.into(), buffer, stream, task); |
214 | 487k | } else { |
215 | 487k | // The stream has no capacity to send the frame now, save it but |
216 | 487k | // don't notify the connection task. Once additional capacity |
217 | 487k | // becomes available, the frame will be flushed. |
218 | 487k | stream.pending_send.push_back(buffer, frame.into()); |
219 | 487k | } |
220 | | |
221 | 487k | Ok(()) |
222 | 487k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::send_data::<_> <h2::proto::streams::prioritize::Prioritize>::send_data::<bytes::bytes::Bytes> Line | Count | Source | 145 | 487k | pub fn send_data<B>( | 146 | 487k | &mut self, | 147 | 487k | frame: frame::Data<B>, | 148 | 487k | buffer: &mut Buffer<Frame<B>>, | 149 | 487k | stream: &mut store::Ptr, | 150 | 487k | counts: &mut Counts, | 151 | 487k | task: &mut Option<Waker>, | 152 | 487k | ) -> Result<(), UserError> | 153 | 487k | where | 154 | 487k | B: Buf, | 155 | | { | 156 | 487k | let sz = frame.payload().remaining(); | 157 | | | 158 | 487k | if sz > MAX_WINDOW_SIZE as usize { | 159 | 0 | return Err(UserError::PayloadTooBig); | 160 | 487k | } | 161 | | | 162 | 487k | let sz = sz as WindowSize; | 163 | | | 164 | 487k | if !stream.state.is_send_streaming() { | 165 | 0 | if stream.state.is_closed() { | 166 | 0 | return Err(InactiveStreamId); | 167 | | } else { | 168 | 0 | return Err(UnexpectedFrameType); | 169 | | } | 170 | 487k | } | 171 | | | 172 | | // Update the buffered data counter | 173 | 487k | stream.buffered_send_data += sz as usize; | 174 | | | 175 | 487k | let span = | 176 | 487k | tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); | 177 | 487k | let _e = span.enter(); | 178 | 487k | tracing::trace!(buffered = stream.buffered_send_data); | 179 | | | 180 | | // Implicitly request more send capacity if not enough has been | 181 | | // requested yet. | 182 | 487k | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { | 183 | 487k | // Update the target requested capacity | 184 | 487k | stream.requested_send_capacity = | 185 | 487k | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; | 186 | 487k | | 187 | 487k | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity | 188 | 487k | // cannot be assigned at the time it is called. | 189 | 487k | self.try_assign_capacity(stream); | 190 | 487k | } | 191 | | | 192 | 487k | if frame.is_end_stream() { | 193 | 487k | stream.state.send_close(); | 194 | 487k | self.reserve_capacity(0, stream, counts); | 195 | 487k | } | 196 | | | 197 | 487k | tracing::trace!( | 198 | 0 | available = %stream.send_flow.available(), | 199 | 0 | buffered = stream.buffered_send_data, | 200 | | ); | 201 | | | 202 | | // The `stream.buffered_send_data == 0` check is here so that, if a zero | 203 | | // length data frame is queued to the front (there is no previously | 204 | | // queued data), it gets sent out immediately even if there is no | 205 | | // available send window. | 206 | | // | 207 | | // Sending out zero length data frames can be done to signal | 208 | | // end-of-stream. | 209 | | // | 210 | 487k | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { | 211 | 0 | // The stream currently has capacity to send the data frame, so | 212 | 0 | // queue it up and notify the connection task. | 213 | 0 | self.queue_frame(frame.into(), buffer, stream, task); | 214 | 487k | } else { | 215 | 487k | // The stream has no capacity to send the frame now, save it but | 216 | 487k | // don't notify the connection task. Once additional capacity | 217 | 487k | // becomes available, the frame will be flushed. | 218 | 487k | stream.pending_send.push_back(buffer, frame.into()); | 219 | 487k | } | 220 | | | 221 | 487k | Ok(()) | 222 | 487k | } |
|
223 | | |
224 | | /// Request capacity to send data |
225 | 487k | pub fn reserve_capacity( |
226 | 487k | &mut self, |
227 | 487k | capacity: WindowSize, |
228 | 487k | stream: &mut store::Ptr, |
229 | 487k | counts: &mut Counts, |
230 | 487k | ) { |
231 | 487k | let span = tracing::trace_span!( |
232 | | "reserve_capacity", |
233 | | ?stream.id, |
234 | | requested = capacity, |
235 | 0 | effective = (capacity as usize) + stream.buffered_send_data, |
236 | 0 | curr = stream.requested_send_capacity |
237 | | ); |
238 | 487k | let _e = span.enter(); |
239 | | |
240 | | // Actual capacity is `capacity` + the current amount of buffered data. |
241 | | // If it were less, then we could never send out the buffered data. |
242 | 487k | let capacity = (capacity as usize) + stream.buffered_send_data; |
243 | | |
244 | 487k | match capacity.cmp(&(stream.requested_send_capacity as usize)) { |
245 | 487k | Ordering::Equal => { |
246 | 487k | // Nothing to do |
247 | 487k | } |
248 | | Ordering::Less => { |
249 | | // Update the target requested capacity |
250 | 0 | stream.requested_send_capacity = capacity as WindowSize; |
251 | | |
252 | | // Currently available capacity assigned to the stream |
253 | 0 | let available = stream.send_flow.available().as_size(); |
254 | | |
255 | | // If the stream has more assigned capacity than requested, reclaim |
256 | | // some for the connection |
257 | 0 | if available as usize > capacity { |
258 | 0 | let diff = available - capacity as WindowSize; |
259 | | |
260 | | // TODO: proper error handling |
261 | 0 | let _res = stream.send_flow.claim_capacity(diff); |
262 | 0 | debug_assert!(_res.is_ok()); |
263 | | |
264 | 0 | self.assign_connection_capacity(diff, stream, counts); |
265 | 0 | } |
266 | | } |
267 | | Ordering::Greater => { |
268 | | // If trying to *add* capacity, but the stream send side is closed, |
269 | | // there's nothing to be done. |
270 | 0 | if stream.state.is_send_closed() { |
271 | 0 | return; |
272 | 0 | } |
273 | | |
274 | | // Update the target requested capacity |
275 | 0 | stream.requested_send_capacity = |
276 | 0 | cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; |
277 | | |
278 | | // Try to assign additional capacity to the stream. If none is |
279 | | // currently available, the stream will be queued to receive some |
280 | | // when more becomes available. |
281 | 0 | self.try_assign_capacity(stream); |
282 | | } |
283 | | } |
284 | 487k | } |
285 | | |
286 | 99.8k | pub fn recv_stream_window_update( |
287 | 99.8k | &mut self, |
288 | 99.8k | inc: WindowSize, |
289 | 99.8k | stream: &mut store::Ptr, |
290 | 99.8k | ) -> Result<(), Reason> { |
291 | 99.8k | let span = tracing::trace_span!( |
292 | | "recv_stream_window_update", |
293 | | ?stream.id, |
294 | | ?stream.state, |
295 | | inc, |
296 | 0 | flow = ?stream.send_flow |
297 | | ); |
298 | 99.8k | let _e = span.enter(); |
299 | | |
300 | 99.8k | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
301 | | // We can't send any data, so don't bother doing anything else. |
302 | 5.20k | return Ok(()); |
303 | 94.6k | } |
304 | | |
305 | | // Update the stream level flow control. |
306 | 94.6k | stream.send_flow.inc_window(inc)?; |
307 | | |
308 | | // If the stream is waiting on additional capacity, then this will |
309 | | // assign it (if available on the connection) and notify the producer |
310 | 94.5k | self.try_assign_capacity(stream); |
311 | | |
312 | 94.5k | Ok(()) |
313 | 99.8k | } |
314 | | |
315 | 829 | pub fn recv_connection_window_update( |
316 | 829 | &mut self, |
317 | 829 | inc: WindowSize, |
318 | 829 | store: &mut Store, |
319 | 829 | counts: &mut Counts, |
320 | 829 | ) -> Result<(), Reason> { |
321 | | // Update the connection's window |
322 | 829 | self.flow.inc_window(inc)?; |
323 | | |
324 | 828 | self.assign_connection_capacity(inc, store, counts); |
325 | 828 | Ok(()) |
326 | 829 | } |
327 | | |
328 | | /// Reclaim all capacity assigned to the stream and re-assign it to the |
329 | | /// connection |
330 | 670k | pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
331 | 670k | let available = stream.send_flow.available().as_size(); |
332 | 670k | if available > 0 { |
333 | | // TODO: proper error handling |
334 | 53.4k | let _res = stream.send_flow.claim_capacity(available); |
335 | 53.4k | debug_assert!(_res.is_ok()); |
336 | | // Re-assign all capacity to the connection |
337 | 53.4k | self.assign_connection_capacity(available, stream, counts); |
338 | 617k | } |
339 | 670k | } |
340 | | |
341 | | /// Reclaim just reserved capacity, not buffered capacity, and re-assign |
342 | | /// it to the connection |
343 | 86.7k | pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
344 | | // only reclaim reserved capacity that isn't already buffered |
345 | 86.7k | if stream.send_flow.available().as_size() as usize > stream.buffered_send_data { |
346 | 0 | let reserved = |
347 | 0 | stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize; |
348 | 0 |
|
349 | 0 | // Panic safety: due to how `reserved` is computed it can't be greater |
350 | 0 | // than what's available. |
351 | 0 | stream |
352 | 0 | .send_flow |
353 | 0 | .claim_capacity(reserved) |
354 | 0 | .expect("window size should be greater than reserved"); |
355 | 0 |
|
356 | 0 | self.assign_connection_capacity(reserved, stream, counts); |
357 | 86.7k | } |
358 | 86.7k | } |
359 | | |
360 | 19.7k | pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { |
361 | 19.7k | let span = tracing::trace_span!("clear_pending_capacity"); |
362 | 19.7k | let _e = span.enter(); |
363 | 35.3k | while let Some(stream) = self.pending_capacity.pop(store) { |
364 | 15.6k | counts.transition(stream, |_, stream| { |
365 | 15.6k | tracing::trace!(?stream.id, "clear_pending_capacity"); |
366 | 15.6k | }) |
367 | | } |
368 | 19.7k | } |
369 | | |
370 | 55.7k | pub fn assign_connection_capacity<R>( |
371 | 55.7k | &mut self, |
372 | 55.7k | inc: WindowSize, |
373 | 55.7k | store: &mut R, |
374 | 55.7k | counts: &mut Counts, |
375 | 55.7k | ) where |
376 | 55.7k | R: Resolve, |
377 | | { |
378 | 55.7k | let span = tracing::trace_span!("assign_connection_capacity", inc); |
379 | 55.7k | let _e = span.enter(); |
380 | | |
381 | | // TODO: proper error handling |
382 | 55.7k | let _res = self.flow.assign_capacity(inc); |
383 | 55.7k | debug_assert!(_res.is_ok()); |
384 | | |
385 | | // Assign newly acquired capacity to streams pending capacity. |
386 | 266k | while self.flow.available() > 0 { |
387 | 235k | let stream = match self.pending_capacity.pop(store) { |
388 | 210k | Some(stream) => stream, |
389 | 25.4k | None => return, |
390 | | }; |
391 | | |
392 | | // Streams pending capacity may have been reset before capacity |
393 | | // became available. In that case, the stream won't want any |
394 | | // capacity, and so we shouldn't "transition" on it, but just evict |
395 | | // it and continue the loop. |
396 | 210k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { |
397 | 141k | continue; |
398 | 68.9k | } |
399 | | |
400 | 68.9k | counts.transition(stream, |_, stream| { |
401 | | // Try to assign capacity to the stream. This will also re-queue the |
402 | | // stream if there isn't enough connection level capacity to fulfill |
403 | | // the capacity request. |
404 | 68.9k | self.try_assign_capacity(stream); |
405 | 68.9k | }) <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr>::{closure#0}Line | Count | Source | 400 | 31.7k | counts.transition(stream, |_, stream| { | 401 | | // Try to assign capacity to the stream. This will also re-queue the | 402 | | // stream if there isn't enough connection level capacity to fulfill | 403 | | // the capacity request. | 404 | 31.7k | self.try_assign_capacity(stream); | 405 | 31.7k | }) |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store>::{closure#0}Line | Count | Source | 400 | 37.2k | counts.transition(stream, |_, stream| { | 401 | | // Try to assign capacity to the stream. This will also re-queue the | 402 | | // stream if there isn't enough connection level capacity to fulfill | 403 | | // the capacity request. | 404 | 37.2k | self.try_assign_capacity(stream); | 405 | 37.2k | }) |
|
406 | | } |
407 | 55.7k | } <h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr> Line | Count | Source | 370 | 53.4k | pub fn assign_connection_capacity<R>( | 371 | 53.4k | &mut self, | 372 | 53.4k | inc: WindowSize, | 373 | 53.4k | store: &mut R, | 374 | 53.4k | counts: &mut Counts, | 375 | 53.4k | ) where | 376 | 53.4k | R: Resolve, | 377 | | { | 378 | 53.4k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 379 | 53.4k | let _e = span.enter(); | 380 | | | 381 | | // TODO: proper error handling | 382 | 53.4k | let _res = self.flow.assign_capacity(inc); | 383 | 53.4k | debug_assert!(_res.is_ok()); | 384 | | | 385 | | // Assign newly acquired capacity to streams pending capacity. | 386 | 225k | while self.flow.available() > 0 { | 387 | 196k | let stream = match self.pending_capacity.pop(store) { | 388 | 172k | Some(stream) => stream, | 389 | 23.5k | None => return, | 390 | | }; | 391 | | | 392 | | // Streams pending capacity may have been reset before capacity | 393 | | // became available. In that case, the stream won't want any | 394 | | // capacity, and so we shouldn't "transition" on it, but just evict | 395 | | // it and continue the loop. | 396 | 172k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 397 | 140k | continue; | 398 | 31.7k | } | 399 | | | 400 | 31.7k | counts.transition(stream, |_, stream| { | 401 | | // Try to assign capacity to the stream. This will also re-queue the | 402 | | // stream if there isn't enough connection level capacity to fulfill | 403 | | // the capacity request. | 404 | | self.try_assign_capacity(stream); | 405 | | }) | 406 | | } | 407 | 53.4k | } |
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store> Line | Count | Source | 370 | 2.33k | pub fn assign_connection_capacity<R>( | 371 | 2.33k | &mut self, | 372 | 2.33k | inc: WindowSize, | 373 | 2.33k | store: &mut R, | 374 | 2.33k | counts: &mut Counts, | 375 | 2.33k | ) where | 376 | 2.33k | R: Resolve, | 377 | | { | 378 | 2.33k | let span = tracing::trace_span!("assign_connection_capacity", inc); | 379 | 2.33k | let _e = span.enter(); | 380 | | | 381 | | // TODO: proper error handling | 382 | 2.33k | let _res = self.flow.assign_capacity(inc); | 383 | 2.33k | debug_assert!(_res.is_ok()); | 384 | | | 385 | | // Assign newly acquired capacity to streams pending capacity. | 386 | 40.1k | while self.flow.available() > 0 { | 387 | 39.6k | let stream = match self.pending_capacity.pop(store) { | 388 | 37.7k | Some(stream) => stream, | 389 | 1.84k | None => return, | 390 | | }; | 391 | | | 392 | | // Streams pending capacity may have been reset before capacity | 393 | | // became available. In that case, the stream won't want any | 394 | | // capacity, and so we shouldn't "transition" on it, but just evict | 395 | | // it and continue the loop. | 396 | 37.7k | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { | 397 | 555 | continue; | 398 | 37.2k | } | 399 | | | 400 | 37.2k | counts.transition(stream, |_, stream| { | 401 | | // Try to assign capacity to the stream. This will also re-queue the | 402 | | // stream if there isn't enough connection level capacity to fulfill | 403 | | // the capacity request. | 404 | | self.try_assign_capacity(stream); | 405 | | }) | 406 | | } | 407 | 2.33k | } |
|
408 | | |
409 | | /// Request capacity to send data |
410 | 860k | fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { |
411 | | // Streams over the max concurrent count should not have capacity assign to avoid starving the connection |
412 | | // capacity for open streams |
413 | 860k | if stream.is_pending_open { |
414 | 522k | return; |
415 | 337k | } |
416 | | |
417 | 337k | let total_requested = stream.requested_send_capacity; |
418 | | |
419 | | // Total requested should never go below actual assigned |
420 | | // (Note: the window size can go lower than assigned) |
421 | 337k | debug_assert!(stream.send_flow.available() <= total_requested as usize); |
422 | | |
423 | | // The amount of additional capacity that the stream requests. |
424 | | // Don't assign more than the window has available! |
425 | 337k | let additional = cmp::min( |
426 | 337k | total_requested - stream.send_flow.available().as_size(), |
427 | | // Can't assign more than what is available |
428 | 337k | stream.send_flow.window_size() - stream.send_flow.available().as_size(), |
429 | | ); |
430 | 337k | let span = tracing::trace_span!("try_assign_capacity", ?stream.id); |
431 | 337k | let _e = span.enter(); |
432 | 337k | tracing::trace!( |
433 | | requested = total_requested, |
434 | | additional, |
435 | 0 | buffered = stream.buffered_send_data, |
436 | 0 | window = stream.send_flow.window_size(), |
437 | 0 | conn = %self.flow.available() |
438 | | ); |
439 | | |
440 | 337k | if additional == 0 { |
441 | | // Nothing more to do |
442 | 21.7k | return; |
443 | 315k | } |
444 | | |
445 | | // If the stream has requested capacity, then it must be in the |
446 | | // streaming state (more data could be sent) or there is buffered data |
447 | | // waiting to be sent. |
448 | 315k | debug_assert!( |
449 | 0 | stream.state.is_send_streaming() || stream.buffered_send_data > 0, |
450 | 0 | "state={:?}", |
451 | 0 | stream.state |
452 | | ); |
453 | | |
454 | | // The amount of currently available capacity on the connection |
455 | 315k | let conn_available = self.flow.available().as_size(); |
456 | | |
457 | | // First check if capacity is immediately available |
458 | 315k | if conn_available > 0 { |
459 | | // The amount of capacity to assign to the stream |
460 | | // TODO: Should prioritization factor into this? |
461 | 86.1k | let assign = cmp::min(conn_available, additional); |
462 | | |
463 | 86.1k | tracing::trace!(capacity = assign, "assigning"); |
464 | | |
465 | | // Assign the capacity to the stream |
466 | 86.1k | stream.assign_capacity(assign, self.max_buffer_size); |
467 | | |
468 | | // Claim the capacity from the connection |
469 | | // TODO: proper error handling |
470 | 86.1k | let _res = self.flow.claim_capacity(assign); |
471 | 86.1k | debug_assert!(_res.is_ok()); |
472 | 229k | } |
473 | | |
474 | 315k | tracing::trace!( |
475 | 0 | available = %stream.send_flow.available(), |
476 | 0 | requested = stream.requested_send_capacity, |
477 | 0 | buffered = stream.buffered_send_data, |
478 | 0 | has_unavailable = %stream.send_flow.has_unavailable() |
479 | | ); |
480 | | |
481 | 315k | if stream.send_flow.available() < stream.requested_send_capacity as usize |
482 | 290k | && stream.send_flow.has_unavailable() |
483 | 252k | { |
484 | 252k | // The stream requires additional capacity and the stream's |
485 | 252k | // window has available capacity, but the connection window |
486 | 252k | // does not. |
487 | 252k | // |
488 | 252k | // In this case, the stream needs to be queued up for when the |
489 | 252k | // connection has more capacity. |
490 | 252k | self.pending_capacity.push(stream); |
491 | 252k | } |
492 | | |
493 | | // If data is buffered and the stream is send ready, then |
494 | | // schedule the stream for execution |
495 | 315k | if stream.buffered_send_data > 0 && stream.is_send_ready() { |
496 | 315k | // TODO: This assertion isn't *exactly* correct. There can still be |
497 | 315k | // buffered send data while the stream's pending send queue is |
498 | 315k | // empty. This can happen when a large data frame is in the process |
499 | 315k | // of being **partially** sent. Once the window has been sent, the |
500 | 315k | // data frame will be returned to the prioritization layer to be |
501 | 315k | // re-scheduled. |
502 | 315k | // |
503 | 315k | // That said, it would be nice to figure out how to make this |
504 | 315k | // assertion correctly. |
505 | 315k | // |
506 | 315k | // debug_assert!(!stream.pending_send.is_empty()); |
507 | 315k | |
508 | 315k | self.pending_send.push(stream); |
509 | 315k | } |
510 | 860k | } |
511 | | |
512 | 152k | pub fn poll_complete<T, B>( |
513 | 152k | &mut self, |
514 | 152k | cx: &mut Context, |
515 | 152k | buffer: &mut Buffer<Frame<B>>, |
516 | 152k | store: &mut Store, |
517 | 152k | counts: &mut Counts, |
518 | 152k | dst: &mut Codec<T, Prioritized<B>>, |
519 | 152k | ) -> Poll<io::Result<()>> |
520 | 152k | where |
521 | 152k | T: AsyncWrite + Unpin, |
522 | 152k | B: Buf, |
523 | | { |
524 | | // Ensure codec is ready |
525 | 152k | ready!(dst.poll_ready(cx))?; |
526 | | |
527 | | // Reclaim any frame that has previously been written |
528 | 152k | self.reclaim_frame(buffer, store, dst); |
529 | | |
530 | | // The max frame length |
531 | 152k | let max_frame_len = dst.max_send_frame_size(); |
532 | | |
533 | 152k | tracing::trace!("poll_complete"); |
534 | | |
535 | | loop { |
536 | 383k | if let Some(mut stream) = self.pop_pending_open(store, counts) { |
537 | 208k | self.pending_send.push_front(&mut stream); |
538 | 208k | self.try_assign_capacity(&mut stream); |
539 | 208k | } |
540 | | |
541 | 383k | match self.pop_frame(buffer, store, max_frame_len, counts) { |
542 | 232k | Some(frame) => { |
543 | 232k | tracing::trace!(?frame, "writing"); |
544 | | |
545 | 232k | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); |
546 | 232k | if let Frame::Data(ref frame) = frame { |
547 | 20.6k | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); |
548 | 212k | } |
549 | 232k | dst.buffer(frame).expect("invalid frame"); |
550 | | |
551 | | // Ensure the codec is ready to try the loop again. |
552 | 232k | ready!(dst.poll_ready(cx))?; |
553 | | |
554 | | // Because, always try to reclaim... |
555 | 230k | self.reclaim_frame(buffer, store, dst); |
556 | | } |
557 | | None => { |
558 | | // Try to flush the codec. |
559 | 150k | ready!(dst.flush(cx))?; |
560 | | |
561 | | // This might release a data frame... |
562 | 142k | if !self.reclaim_frame(buffer, store, dst) { |
563 | 142k | return Poll::Ready(Ok(())); |
564 | 0 | } |
565 | | |
566 | | // No need to poll ready as poll_complete() does this for |
567 | | // us... |
568 | | } |
569 | | } |
570 | | } |
571 | 152k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::poll_complete::<_, _> <h2::proto::streams::prioritize::Prioritize>::poll_complete::<fuzz_e2e::MockIo, bytes::bytes::Bytes> Line | Count | Source | 512 | 152k | pub fn poll_complete<T, B>( | 513 | 152k | &mut self, | 514 | 152k | cx: &mut Context, | 515 | 152k | buffer: &mut Buffer<Frame<B>>, | 516 | 152k | store: &mut Store, | 517 | 152k | counts: &mut Counts, | 518 | 152k | dst: &mut Codec<T, Prioritized<B>>, | 519 | 152k | ) -> Poll<io::Result<()>> | 520 | 152k | where | 521 | 152k | T: AsyncWrite + Unpin, | 522 | 152k | B: Buf, | 523 | | { | 524 | | // Ensure codec is ready | 525 | 152k | ready!(dst.poll_ready(cx))?; | 526 | | | 527 | | // Reclaim any frame that has previously been written | 528 | 152k | self.reclaim_frame(buffer, store, dst); | 529 | | | 530 | | // The max frame length | 531 | 152k | let max_frame_len = dst.max_send_frame_size(); | 532 | | | 533 | 152k | tracing::trace!("poll_complete"); | 534 | | | 535 | | loop { | 536 | 383k | if let Some(mut stream) = self.pop_pending_open(store, counts) { | 537 | 208k | self.pending_send.push_front(&mut stream); | 538 | 208k | self.try_assign_capacity(&mut stream); | 539 | 208k | } | 540 | | | 541 | 383k | match self.pop_frame(buffer, store, max_frame_len, counts) { | 542 | 232k | Some(frame) => { | 543 | 232k | tracing::trace!(?frame, "writing"); | 544 | | | 545 | 232k | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); | 546 | 232k | if let Frame::Data(ref frame) = frame { | 547 | 20.6k | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); | 548 | 212k | } | 549 | 232k | dst.buffer(frame).expect("invalid frame"); | 550 | | | 551 | | // Ensure the codec is ready to try the loop again. | 552 | 232k | ready!(dst.poll_ready(cx))?; | 553 | | | 554 | | // Because, always try to reclaim... | 555 | 230k | self.reclaim_frame(buffer, store, dst); | 556 | | } | 557 | | None => { | 558 | | // Try to flush the codec. | 559 | 150k | ready!(dst.flush(cx))?; | 560 | | | 561 | | // This might release a data frame... | 562 | 142k | if !self.reclaim_frame(buffer, store, dst) { | 563 | 142k | return Poll::Ready(Ok(())); | 564 | 0 | } | 565 | | | 566 | | // No need to poll ready as poll_complete() does this for | 567 | | // us... | 568 | | } | 569 | | } | 570 | | } | 571 | 152k | } |
|
572 | | |
573 | | /// Tries to reclaim a pending data frame from the codec. |
574 | | /// |
575 | | /// Returns true if a frame was reclaimed. |
576 | | /// |
577 | | /// When a data frame is written to the codec, it may not be written in its |
578 | | /// entirety (large chunks are split up into potentially many data frames). |
579 | | /// In this case, the stream needs to be reprioritized. |
580 | 526k | fn reclaim_frame<T, B>( |
581 | 526k | &mut self, |
582 | 526k | buffer: &mut Buffer<Frame<B>>, |
583 | 526k | store: &mut Store, |
584 | 526k | dst: &mut Codec<T, Prioritized<B>>, |
585 | 526k | ) -> bool |
586 | 526k | where |
587 | 526k | B: Buf, |
588 | | { |
589 | 526k | let span = tracing::trace_span!("try_reclaim_frame"); |
590 | 526k | let _e = span.enter(); |
591 | | |
592 | | // First check if there are any data chunks to take back |
593 | 526k | if let Some(frame) = dst.take_last_data_frame() { |
594 | 19.1k | self.reclaim_frame_inner(buffer, store, frame) |
595 | | } else { |
596 | 507k | false |
597 | | } |
598 | 526k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<_, _> <h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<fuzz_e2e::MockIo, bytes::bytes::Bytes> Line | Count | Source | 580 | 526k | fn reclaim_frame<T, B>( | 581 | 526k | &mut self, | 582 | 526k | buffer: &mut Buffer<Frame<B>>, | 583 | 526k | store: &mut Store, | 584 | 526k | dst: &mut Codec<T, Prioritized<B>>, | 585 | 526k | ) -> bool | 586 | 526k | where | 587 | 526k | B: Buf, | 588 | | { | 589 | 526k | let span = tracing::trace_span!("try_reclaim_frame"); | 590 | 526k | let _e = span.enter(); | 591 | | | 592 | | // First check if there are any data chunks to take back | 593 | 526k | if let Some(frame) = dst.take_last_data_frame() { | 594 | 19.1k | self.reclaim_frame_inner(buffer, store, frame) | 595 | | } else { | 596 | 507k | false | 597 | | } | 598 | 526k | } |
|
599 | | |
600 | 19.1k | fn reclaim_frame_inner<B>( |
601 | 19.1k | &mut self, |
602 | 19.1k | buffer: &mut Buffer<Frame<B>>, |
603 | 19.1k | store: &mut Store, |
604 | 19.1k | frame: frame::Data<Prioritized<B>>, |
605 | 19.1k | ) -> bool |
606 | 19.1k | where |
607 | 19.1k | B: Buf, |
608 | | { |
609 | 19.1k | tracing::trace!( |
610 | | ?frame, |
611 | 0 | sz = frame.payload().inner.get_ref().remaining(), |
612 | 0 | "reclaimed" |
613 | | ); |
614 | | |
615 | 19.1k | let mut eos = false; |
616 | 19.1k | let key = frame.payload().stream; |
617 | | |
618 | 19.1k | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { |
619 | 0 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), |
620 | | InFlightData::Drop => { |
621 | 49 | tracing::trace!("not reclaiming frame for cancelled stream"); |
622 | 49 | return false; |
623 | | } |
624 | 19.1k | InFlightData::DataFrame(k) => { |
625 | 19.1k | debug_assert_eq!(k, key); |
626 | | } |
627 | | } |
628 | | |
629 | 19.1k | let mut frame = frame.map(|prioritized| { |
630 | | // TODO: Ensure fully written |
631 | 19.1k | eos = prioritized.end_of_stream; |
632 | 19.1k | prioritized.inner.into_inner() |
633 | 19.1k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_>::{closure#0}<h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes>::{closure#0}Line | Count | Source | 629 | 19.1k | let mut frame = frame.map(|prioritized| { | 630 | | // TODO: Ensure fully written | 631 | 19.1k | eos = prioritized.end_of_stream; | 632 | 19.1k | prioritized.inner.into_inner() | 633 | 19.1k | }); |
|
634 | | |
635 | 19.1k | if frame.payload().has_remaining() { |
636 | 18.5k | let mut stream = store.resolve(key); |
637 | | |
638 | 18.5k | if eos { |
639 | 18.5k | frame.set_end_stream(true); |
640 | 18.5k | } |
641 | | |
642 | 18.5k | self.push_back_frame(frame.into(), buffer, &mut stream); |
643 | | |
644 | 18.5k | return true; |
645 | 628 | } |
646 | | |
647 | 628 | false |
648 | 19.1k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_> <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes> Line | Count | Source | 600 | 19.1k | fn reclaim_frame_inner<B>( | 601 | 19.1k | &mut self, | 602 | 19.1k | buffer: &mut Buffer<Frame<B>>, | 603 | 19.1k | store: &mut Store, | 604 | 19.1k | frame: frame::Data<Prioritized<B>>, | 605 | 19.1k | ) -> bool | 606 | 19.1k | where | 607 | 19.1k | B: Buf, | 608 | | { | 609 | 19.1k | tracing::trace!( | 610 | | ?frame, | 611 | 0 | sz = frame.payload().inner.get_ref().remaining(), | 612 | 0 | "reclaimed" | 613 | | ); | 614 | | | 615 | 19.1k | let mut eos = false; | 616 | 19.1k | let key = frame.payload().stream; | 617 | | | 618 | 19.1k | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { | 619 | 0 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), | 620 | | InFlightData::Drop => { | 621 | 49 | tracing::trace!("not reclaiming frame for cancelled stream"); | 622 | 49 | return false; | 623 | | } | 624 | 19.1k | InFlightData::DataFrame(k) => { | 625 | 19.1k | debug_assert_eq!(k, key); | 626 | | } | 627 | | } | 628 | | | 629 | 19.1k | let mut frame = frame.map(|prioritized| { | 630 | | // TODO: Ensure fully written | 631 | | eos = prioritized.end_of_stream; | 632 | | prioritized.inner.into_inner() | 633 | | }); | 634 | | | 635 | 19.1k | if frame.payload().has_remaining() { | 636 | 18.5k | let mut stream = store.resolve(key); | 637 | | | 638 | 18.5k | if eos { | 639 | 18.5k | frame.set_end_stream(true); | 640 | 18.5k | } | 641 | | | 642 | 18.5k | self.push_back_frame(frame.into(), buffer, &mut stream); | 643 | | | 644 | 18.5k | return true; | 645 | 628 | } | 646 | | | 647 | 628 | false | 648 | 19.1k | } |
|
649 | | |
650 | | /// Push the frame to the front of the stream's deque, scheduling the |
651 | | /// stream if needed. |
652 | 18.5k | fn push_back_frame<B>( |
653 | 18.5k | &mut self, |
654 | 18.5k | frame: Frame<B>, |
655 | 18.5k | buffer: &mut Buffer<Frame<B>>, |
656 | 18.5k | stream: &mut store::Ptr, |
657 | 18.5k | ) { |
658 | | // Push the frame to the front of the stream's deque |
659 | 18.5k | stream.pending_send.push_front(buffer, frame); |
660 | | |
661 | | // If needed, schedule the sender |
662 | 18.5k | if stream.send_flow.available() > 0 { |
663 | 2.19k | debug_assert!(!stream.pending_send.is_empty()); |
664 | 2.19k | self.pending_send.push(stream); |
665 | 16.3k | } |
666 | 18.5k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::push_back_frame::<_> <h2::proto::streams::prioritize::Prioritize>::push_back_frame::<bytes::bytes::Bytes> Line | Count | Source | 652 | 18.5k | fn push_back_frame<B>( | 653 | 18.5k | &mut self, | 654 | 18.5k | frame: Frame<B>, | 655 | 18.5k | buffer: &mut Buffer<Frame<B>>, | 656 | 18.5k | stream: &mut store::Ptr, | 657 | 18.5k | ) { | 658 | | // Push the frame to the front of the stream's deque | 659 | 18.5k | stream.pending_send.push_front(buffer, frame); | 660 | | | 661 | | // If needed, schedule the sender | 662 | 18.5k | if stream.send_flow.available() > 0 { | 663 | 2.19k | debug_assert!(!stream.pending_send.is_empty()); | 664 | 2.19k | self.pending_send.push(stream); | 665 | 16.3k | } | 666 | 18.5k | } |
|
667 | | |
668 | 670k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { |
669 | 670k | let span = tracing::trace_span!("clear_queue", ?stream.id); |
670 | 670k | let _e = span.enter(); |
671 | | |
672 | | // TODO: make this more efficient? |
673 | 1.46M | while let Some(frame) = stream.pending_send.pop_front(buffer) { |
674 | 790k | tracing::trace!(?frame, "dropping"); |
675 | | } |
676 | | |
677 | 670k | stream.buffered_send_data = 0; |
678 | 670k | stream.requested_send_capacity = 0; |
679 | 670k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { |
680 | 38.2k | if stream.key() == key { |
681 | 1.48k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. |
682 | 1.48k | self.in_flight_data_frame = InFlightData::Drop; |
683 | 36.7k | } |
684 | 631k | } |
685 | 670k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::clear_queue::<_> <h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 708 | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 708 | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 708 | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 1.41k | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 708 | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 708 | stream.buffered_send_data = 0; | 678 | 708 | stream.requested_send_capacity = 0; | 679 | 708 | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 0 | if stream.key() == key { | 681 | 0 | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 0 | self.in_flight_data_frame = InFlightData::Drop; | 683 | 0 | } | 684 | 708 | } | 685 | 708 | } |
<h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes> Line | Count | Source | 668 | 669k | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { | 669 | 669k | let span = tracing::trace_span!("clear_queue", ?stream.id); | 670 | 669k | let _e = span.enter(); | 671 | | | 672 | | // TODO: make this more efficient? | 673 | 1.45M | while let Some(frame) = stream.pending_send.pop_front(buffer) { | 674 | 790k | tracing::trace!(?frame, "dropping"); | 675 | | } | 676 | | | 677 | 669k | stream.buffered_send_data = 0; | 678 | 669k | stream.requested_send_capacity = 0; | 679 | 669k | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { | 680 | 38.2k | if stream.key() == key { | 681 | 1.48k | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. | 682 | 1.48k | self.in_flight_data_frame = InFlightData::Drop; | 683 | 36.7k | } | 684 | 631k | } | 685 | 669k | } |
|
686 | | |
687 | 19.7k | pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { |
688 | 219k | while let Some(mut stream) = self.pending_send.pop(store) { |
689 | 200k | let is_pending_reset = stream.is_pending_reset_expiration(); |
690 | 200k | if let Some(reason) = stream.state.get_scheduled_reset() { |
691 | 77.1k | stream.set_reset(reason, Initiator::Library); |
692 | 123k | } |
693 | 200k | counts.transition_after(stream, is_pending_reset); |
694 | | } |
695 | 19.7k | } |
696 | | |
697 | 19.7k | pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { |
698 | 299k | while let Some(stream) = self.pending_open.pop(store) { |
699 | 279k | let is_pending_reset = stream.is_pending_reset_expiration(); |
700 | 279k | counts.transition_after(stream, is_pending_reset); |
701 | 279k | } |
702 | 19.7k | } |
703 | | |
704 | 383k | fn pop_frame<B>( |
705 | 383k | &mut self, |
706 | 383k | buffer: &mut Buffer<Frame<B>>, |
707 | 383k | store: &mut Store, |
708 | 383k | max_len: usize, |
709 | 383k | counts: &mut Counts, |
710 | 383k | ) -> Option<Frame<Prioritized<B>>> |
711 | 383k | where |
712 | 383k | B: Buf, |
713 | | { |
714 | 383k | let span = tracing::trace_span!("pop_frame"); |
715 | 383k | let _e = span.enter(); |
716 | | |
717 | | loop { |
718 | 422k | match self.pending_send.pop(store) { |
719 | 272k | Some(mut stream) => { |
720 | 272k | let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); |
721 | 272k | let _e = span.enter(); |
722 | | |
723 | | // It's possible that this stream, besides having data to send, |
724 | | // is also queued to send a reset, and thus is already in the queue |
725 | | // to wait for "some time" after a reset. |
726 | | // |
727 | | // To be safe, we just always ask the stream. |
728 | 272k | let is_pending_reset = stream.is_pending_reset_expiration(); |
729 | | |
730 | 272k | tracing::trace!(is_pending_reset); |
731 | | |
732 | 272k | let frame = match stream.pending_send.pop_front(buffer) { |
733 | 52.3k | Some(Frame::Data(mut frame)) => { |
734 | | // Get the amount of capacity remaining for stream's |
735 | | // window. |
736 | 52.3k | let stream_capacity = stream.send_flow.available(); |
737 | 52.3k | let sz = frame.payload().remaining(); |
738 | | |
739 | 52.3k | tracing::trace!( |
740 | | sz, |
741 | 0 | eos = frame.is_end_stream(), |
742 | | window = %stream_capacity, |
743 | 0 | available = %stream.send_flow.available(), |
744 | 0 | requested = stream.requested_send_capacity, |
745 | 0 | buffered = stream.buffered_send_data, |
746 | 0 | "data frame" |
747 | | ); |
748 | | |
749 | | // Zero length data frames always have capacity to |
750 | | // be sent. |
751 | 52.3k | if sz > 0 && stream_capacity == 0 { |
752 | 31.6k | tracing::trace!("stream capacity is 0"); |
753 | | |
754 | | // Ensure that the stream is waiting for |
755 | | // connection level capacity |
756 | | // |
757 | | // TODO: uncomment |
758 | | // debug_assert!(stream.is_pending_send_capacity); |
759 | | |
760 | | // The stream has no more capacity, this can |
761 | | // happen if the remote reduced the stream |
762 | | // window. In this case, we need to buffer the |
763 | | // frame and wait for a window update... |
764 | 31.6k | stream.pending_send.push_front(buffer, frame.into()); |
765 | | |
766 | 31.6k | continue; |
767 | 20.6k | } |
768 | | |
769 | | // Only send up to the max frame length |
770 | 20.6k | let len = cmp::min(sz, max_len); |
771 | | |
772 | | // Only send up to the stream's window capacity |
773 | 20.6k | let len = |
774 | 20.6k | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; |
775 | | |
776 | | // There *must* be be enough connection level |
777 | | // capacity at this point. |
778 | 20.6k | debug_assert!(len <= self.flow.window_size()); |
779 | | |
780 | | // Check if the stream level window the peer knows is available. In some |
781 | | // scenarios, maybe the window we know is available but the window which |
782 | | // peer knows is not. |
783 | 20.6k | if len > 0 && len > stream.send_flow.window_size() { |
784 | 0 | stream.pending_send.push_front(buffer, frame.into()); |
785 | 0 | continue; |
786 | 20.6k | } |
787 | | |
788 | 20.6k | tracing::trace!(len, "sending data frame"); |
789 | | |
790 | | // Update the flow control |
791 | 20.6k | tracing::trace_span!("updating stream flow").in_scope(|| { |
792 | 20.6k | stream.send_data(len, self.max_buffer_size); |
793 | | |
794 | | // Assign the capacity back to the connection that |
795 | | // was just consumed from the stream in the previous |
796 | | // line. |
797 | | // TODO: proper error handling |
798 | 20.6k | let _res = self.flow.assign_capacity(len); |
799 | 20.6k | debug_assert!(_res.is_ok()); |
800 | 20.6k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#0}<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#0}Line | Count | Source | 791 | 20.6k | tracing::trace_span!("updating stream flow").in_scope(|| { | 792 | 20.6k | stream.send_data(len, self.max_buffer_size); | 793 | | | 794 | | // Assign the capacity back to the connection that | 795 | | // was just consumed from the stream in the previous | 796 | | // line. | 797 | | // TODO: proper error handling | 798 | 20.6k | let _res = self.flow.assign_capacity(len); | 799 | 20.6k | debug_assert!(_res.is_ok()); | 800 | 20.6k | }); |
|
801 | | |
802 | 20.6k | let (eos, len) = tracing::trace_span!("updating connection flow") |
803 | 20.6k | .in_scope(|| { |
804 | | // TODO: proper error handling |
805 | 20.6k | let _res = self.flow.send_data(len); |
806 | 20.6k | debug_assert!(_res.is_ok()); |
807 | | |
808 | | // Wrap the frame's data payload to ensure that the |
809 | | // correct amount of data gets written. |
810 | | |
811 | 20.6k | let eos = frame.is_end_stream(); |
812 | 20.6k | let len = len as usize; |
813 | | |
814 | 20.6k | if frame.payload().remaining() > len { |
815 | 19.9k | frame.set_end_stream(false); |
816 | 19.9k | } |
817 | 20.6k | (eos, len) |
818 | 20.6k | }); Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#1}<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#1}Line | Count | Source | 803 | 20.6k | .in_scope(|| { | 804 | | // TODO: proper error handling | 805 | 20.6k | let _res = self.flow.send_data(len); | 806 | 20.6k | debug_assert!(_res.is_ok()); | 807 | | | 808 | | // Wrap the frame's data payload to ensure that the | 809 | | // correct amount of data gets written. | 810 | | | 811 | 20.6k | let eos = frame.is_end_stream(); | 812 | 20.6k | let len = len as usize; | 813 | | | 814 | 20.6k | if frame.payload().remaining() > len { | 815 | 19.9k | frame.set_end_stream(false); | 816 | 19.9k | } | 817 | 20.6k | (eos, len) | 818 | 20.6k | }); |
|
819 | | |
820 | 20.6k | Frame::Data(frame.map(|buf| Prioritized { |
821 | 20.6k | inner: buf.take(len), |
822 | 20.6k | end_of_stream: eos, |
823 | 20.6k | stream: stream.key(), |
824 | 20.6k | })) Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#2}<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#2}Line | Count | Source | 821 | 20.6k | inner: buf.take(len), | 822 | 20.6k | end_of_stream: eos, | 823 | 20.6k | stream: stream.key(), | 824 | 20.6k | })) |
|
825 | | } |
826 | 0 | Some(Frame::PushPromise(pp)) => { |
827 | 0 | let mut pushed = |
828 | 0 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); |
829 | 0 | pushed.is_pending_push = false; |
830 | | // Transition stream from pending_push to pending_open |
831 | | // if possible |
832 | 0 | if !pushed.pending_send.is_empty() { |
833 | 0 | if counts.can_inc_num_send_streams() { |
834 | 0 | counts.inc_num_send_streams(&mut pushed); |
835 | 0 | self.pending_send.push(&mut pushed); |
836 | 0 | } else { |
837 | 0 | self.queue_open(&mut pushed); |
838 | 0 | } |
839 | 0 | } |
840 | 0 | Frame::PushPromise(pp) |
841 | | } |
842 | 212k | Some(frame) => frame.map(|_| { |
843 | 0 | unreachable!( Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#3}Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#3} |
844 | | "Frame::map closure will only be called \ |
845 | | on DATA frames." |
846 | | ) |
847 | | }), |
848 | | None => { |
849 | 7.59k | if let Some(reason) = stream.state.get_scheduled_reset() { |
850 | 118 | stream.set_reset(reason, Initiator::Library); |
851 | | |
852 | 118 | let frame = frame::Reset::new(stream.id, reason); |
853 | 118 | Frame::Reset(frame) |
854 | | } else { |
855 | | // If the stream receives a RESET from the peer, it may have |
856 | | // had data buffered to be sent, but all the frames are cleared |
857 | | // in clear_queue(). Instead of doing O(N) traversal through queue |
858 | | // to remove, lets just ignore the stream here. |
859 | 7.47k | tracing::trace!("removing dangling stream from pending_send"); |
860 | | // Since this should only happen as a consequence of `clear_queue`, |
861 | | // we must be in a closed state of some kind. |
862 | 7.47k | debug_assert!(stream.state.is_closed()); |
863 | 7.47k | counts.transition_after(stream, is_pending_reset); |
864 | 7.47k | continue; |
865 | | } |
866 | | } |
867 | | }; |
868 | | |
869 | 232k | tracing::trace!("pop_frame; frame={:?}", frame); |
870 | | |
871 | 232k | if cfg!(debug_assertions) && stream.state.is_idle() { |
872 | 0 | debug_assert!(stream.id > self.last_opened_id); |
873 | 0 | self.last_opened_id = stream.id; |
874 | 232k | } |
875 | | |
876 | 232k | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { |
877 | 208k | // TODO: Only requeue the sender IF it is ready to send |
878 | 208k | // the next frame. i.e. don't requeue it if the next |
879 | 208k | // frame is a data frame and the stream does not have |
880 | 208k | // any more capacity. |
881 | 208k | self.pending_send.push(&mut stream); |
882 | 208k | } |
883 | | |
884 | 232k | counts.transition_after(stream, is_pending_reset); |
885 | | |
886 | 232k | return Some(frame); |
887 | | } |
888 | 150k | None => return None, |
889 | | } |
890 | | } |
891 | 383k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_> <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes> Line | Count | Source | 704 | 383k | fn pop_frame<B>( | 705 | 383k | &mut self, | 706 | 383k | buffer: &mut Buffer<Frame<B>>, | 707 | 383k | store: &mut Store, | 708 | 383k | max_len: usize, | 709 | 383k | counts: &mut Counts, | 710 | 383k | ) -> Option<Frame<Prioritized<B>>> | 711 | 383k | where | 712 | 383k | B: Buf, | 713 | | { | 714 | 383k | let span = tracing::trace_span!("pop_frame"); | 715 | 383k | let _e = span.enter(); | 716 | | | 717 | | loop { | 718 | 422k | match self.pending_send.pop(store) { | 719 | 272k | Some(mut stream) => { | 720 | 272k | let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); | 721 | 272k | let _e = span.enter(); | 722 | | | 723 | | // It's possible that this stream, besides having data to send, | 724 | | // is also queued to send a reset, and thus is already in the queue | 725 | | // to wait for "some time" after a reset. | 726 | | // | 727 | | // To be safe, we just always ask the stream. | 728 | 272k | let is_pending_reset = stream.is_pending_reset_expiration(); | 729 | | | 730 | 272k | tracing::trace!(is_pending_reset); | 731 | | | 732 | 272k | let frame = match stream.pending_send.pop_front(buffer) { | 733 | 52.3k | Some(Frame::Data(mut frame)) => { | 734 | | // Get the amount of capacity remaining for stream's | 735 | | // window. | 736 | 52.3k | let stream_capacity = stream.send_flow.available(); | 737 | 52.3k | let sz = frame.payload().remaining(); | 738 | | | 739 | 52.3k | tracing::trace!( | 740 | | sz, | 741 | 0 | eos = frame.is_end_stream(), | 742 | | window = %stream_capacity, | 743 | 0 | available = %stream.send_flow.available(), | 744 | 0 | requested = stream.requested_send_capacity, | 745 | 0 | buffered = stream.buffered_send_data, | 746 | 0 | "data frame" | 747 | | ); | 748 | | | 749 | | // Zero length data frames always have capacity to | 750 | | // be sent. | 751 | 52.3k | if sz > 0 && stream_capacity == 0 { | 752 | 31.6k | tracing::trace!("stream capacity is 0"); | 753 | | | 754 | | // Ensure that the stream is waiting for | 755 | | // connection level capacity | 756 | | // | 757 | | // TODO: uncomment | 758 | | // debug_assert!(stream.is_pending_send_capacity); | 759 | | | 760 | | // The stream has no more capacity, this can | 761 | | // happen if the remote reduced the stream | 762 | | // window. In this case, we need to buffer the | 763 | | // frame and wait for a window update... | 764 | 31.6k | stream.pending_send.push_front(buffer, frame.into()); | 765 | | | 766 | 31.6k | continue; | 767 | 20.6k | } | 768 | | | 769 | | // Only send up to the max frame length | 770 | 20.6k | let len = cmp::min(sz, max_len); | 771 | | | 772 | | // Only send up to the stream's window capacity | 773 | 20.6k | let len = | 774 | 20.6k | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; | 775 | | | 776 | | // There *must* be be enough connection level | 777 | | // capacity at this point. | 778 | 20.6k | debug_assert!(len <= self.flow.window_size()); | 779 | | | 780 | | // Check if the stream level window the peer knows is available. In some | 781 | | // scenarios, maybe the window we know is available but the window which | 782 | | // peer knows is not. | 783 | 20.6k | if len > 0 && len > stream.send_flow.window_size() { | 784 | 0 | stream.pending_send.push_front(buffer, frame.into()); | 785 | 0 | continue; | 786 | 20.6k | } | 787 | | | 788 | 20.6k | tracing::trace!(len, "sending data frame"); | 789 | | | 790 | | // Update the flow control | 791 | 20.6k | tracing::trace_span!("updating stream flow").in_scope(|| { | 792 | | stream.send_data(len, self.max_buffer_size); | 793 | | | 794 | | // Assign the capacity back to the connection that | 795 | | // was just consumed from the stream in the previous | 796 | | // line. | 797 | | // TODO: proper error handling | 798 | | let _res = self.flow.assign_capacity(len); | 799 | | debug_assert!(_res.is_ok()); | 800 | | }); | 801 | | | 802 | 20.6k | let (eos, len) = tracing::trace_span!("updating connection flow") | 803 | 20.6k | .in_scope(|| { | 804 | | // TODO: proper error handling | 805 | | let _res = self.flow.send_data(len); | 806 | | debug_assert!(_res.is_ok()); | 807 | | | 808 | | // Wrap the frame's data payload to ensure that the | 809 | | // correct amount of data gets written. | 810 | | | 811 | | let eos = frame.is_end_stream(); | 812 | | let len = len as usize; | 813 | | | 814 | | if frame.payload().remaining() > len { | 815 | | frame.set_end_stream(false); | 816 | | } | 817 | | (eos, len) | 818 | | }); | 819 | | | 820 | 20.6k | Frame::Data(frame.map(|buf| Prioritized { | 821 | | inner: buf.take(len), | 822 | | end_of_stream: eos, | 823 | | stream: stream.key(), | 824 | | })) | 825 | | } | 826 | 0 | Some(Frame::PushPromise(pp)) => { | 827 | 0 | let mut pushed = | 828 | 0 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); | 829 | 0 | pushed.is_pending_push = false; | 830 | | // Transition stream from pending_push to pending_open | 831 | | // if possible | 832 | 0 | if !pushed.pending_send.is_empty() { | 833 | 0 | if counts.can_inc_num_send_streams() { | 834 | 0 | counts.inc_num_send_streams(&mut pushed); | 835 | 0 | self.pending_send.push(&mut pushed); | 836 | 0 | } else { | 837 | 0 | self.queue_open(&mut pushed); | 838 | 0 | } | 839 | 0 | } | 840 | 0 | Frame::PushPromise(pp) | 841 | | } | 842 | 212k | Some(frame) => frame.map(|_| { | 843 | | unreachable!( | 844 | | "Frame::map closure will only be called \ | 845 | | on DATA frames." | 846 | | ) | 847 | | }), | 848 | | None => { | 849 | 7.59k | if let Some(reason) = stream.state.get_scheduled_reset() { | 850 | 118 | stream.set_reset(reason, Initiator::Library); | 851 | | | 852 | 118 | let frame = frame::Reset::new(stream.id, reason); | 853 | 118 | Frame::Reset(frame) | 854 | | } else { | 855 | | // If the stream receives a RESET from the peer, it may have | 856 | | // had data buffered to be sent, but all the frames are cleared | 857 | | // in clear_queue(). Instead of doing O(N) traversal through queue | 858 | | // to remove, lets just ignore the stream here. | 859 | 7.47k | tracing::trace!("removing dangling stream from pending_send"); | 860 | | // Since this should only happen as a consequence of `clear_queue`, | 861 | | // we must be in a closed state of some kind. | 862 | 7.47k | debug_assert!(stream.state.is_closed()); | 863 | 7.47k | counts.transition_after(stream, is_pending_reset); | 864 | 7.47k | continue; | 865 | | } | 866 | | } | 867 | | }; | 868 | | | 869 | 232k | tracing::trace!("pop_frame; frame={:?}", frame); | 870 | | | 871 | 232k | if cfg!(debug_assertions) && stream.state.is_idle() { | 872 | 0 | debug_assert!(stream.id > self.last_opened_id); | 873 | 0 | self.last_opened_id = stream.id; | 874 | 232k | } | 875 | | | 876 | 232k | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { | 877 | 208k | // TODO: Only requeue the sender IF it is ready to send | 878 | 208k | // the next frame. i.e. don't requeue it if the next | 879 | 208k | // frame is a data frame and the stream does not have | 880 | 208k | // any more capacity. | 881 | 208k | self.pending_send.push(&mut stream); | 882 | 208k | } | 883 | | | 884 | 232k | counts.transition_after(stream, is_pending_reset); | 885 | | | 886 | 232k | return Some(frame); | 887 | | } | 888 | 150k | None => return None, | 889 | | } | 890 | | } | 891 | 383k | } |
|
892 | | |
893 | 383k | fn pop_pending_open<'s>( |
894 | 383k | &mut self, |
895 | 383k | store: &'s mut Store, |
896 | 383k | counts: &mut Counts, |
897 | 383k | ) -> Option<store::Ptr<'s>> { |
898 | 383k | tracing::trace!("schedule_pending_open"); |
899 | | // check for any pending open streams |
900 | 383k | if counts.can_inc_num_send_streams() { |
901 | 380k | if let Some(mut stream) = self.pending_open.pop(store) { |
902 | 208k | tracing::trace!("schedule_pending_open; stream={:?}", stream.id); |
903 | | |
904 | 208k | counts.inc_num_send_streams(&mut stream); |
905 | 208k | stream.notify_send(); |
906 | 208k | return Some(stream); |
907 | 172k | } |
908 | 2.61k | } |
909 | | |
910 | 174k | None |
911 | 383k | } |
912 | | } |
913 | | |
914 | | // ===== impl Prioritized ===== |
915 | | |
916 | | impl<B> Buf for Prioritized<B> |
917 | | where |
918 | | B: Buf, |
919 | | { |
920 | 760k | fn remaining(&self) -> usize { |
921 | 760k | self.inner.remaining() |
922 | 760k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::remaining Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining Line | Count | Source | 920 | 760k | fn remaining(&self) -> usize { | 921 | 760k | self.inner.remaining() | 922 | 760k | } |
|
923 | | |
924 | 27.1k | fn chunk(&self) -> &[u8] { |
925 | 27.1k | self.inner.chunk() |
926 | 27.1k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunk Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk Line | Count | Source | 924 | 27.1k | fn chunk(&self) -> &[u8] { | 925 | 27.1k | self.inner.chunk() | 926 | 27.1k | } |
|
927 | | |
928 | 0 | fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { |
929 | 0 | self.inner.chunks_vectored(dst) |
930 | 0 | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunks_vectored Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunks_vectored |
931 | | |
932 | 23.3k | fn advance(&mut self, cnt: usize) { |
933 | 23.3k | self.inner.advance(cnt) |
934 | 23.3k | } Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::advance Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance Line | Count | Source | 932 | 23.3k | fn advance(&mut self, cnt: usize) { | 933 | 23.3k | self.inner.advance(cnt) | 934 | 23.3k | } |
|
935 | | } |
936 | | |
937 | | impl<B: Buf> fmt::Debug for Prioritized<B> { |
938 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
939 | 0 | fmt.debug_struct("Prioritized") |
940 | 0 | .field("remaining", &self.inner.get_ref().remaining()) |
941 | 0 | .field("end_of_stream", &self.end_of_stream) |
942 | 0 | .field("stream", &self.stream) |
943 | 0 | .finish() |
944 | 0 | } |
945 | | } |