Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/proto/streams/prioritize.rs
Line
Count
Source (jump to first uncovered line)
1
use super::store::Resolve;
2
use super::*;
3
4
use crate::frame::Reason;
5
6
use crate::codec::UserError;
7
use crate::codec::UserError::*;
8
9
use bytes::buf::Take;
10
use std::{
11
    cmp::{self, Ordering},
12
    fmt, io, mem,
13
    task::{Context, Poll, Waker},
14
};
15
16
/// # Warning
17
///
18
/// Queued streams are ordered by stream ID, as we need to ensure that
19
/// lower-numbered streams are sent headers before higher-numbered ones.
20
/// This is because "idle" stream IDs – those which have been initiated but
21
/// have yet to receive frames – will be implicitly closed on receipt of a
22
/// frame on a higher stream ID. If these queues was not ordered by stream
23
/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24
/// idle stream is opened first.
25
#[derive(Debug)]
26
pub(super) struct Prioritize {
27
    /// Queue of streams waiting for socket capacity to send a frame.
28
    pending_send: store::Queue<stream::NextSend>,
29
30
    /// Queue of streams waiting for window capacity to produce data.
31
    pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33
    /// Streams waiting for capacity due to max concurrency
34
    ///
35
    /// The `SendRequest` handle is `Clone`. This enables initiating requests
36
    /// from many tasks. However, offering this capability while supporting
37
    /// backpressure at some level is tricky. If there are many `SendRequest`
38
    /// handles and a single stream becomes available, which handle gets
39
    /// assigned that stream? Maybe that handle is no longer ready to send a
40
    /// request.
41
    ///
42
    /// The strategy used is to allow each `SendRequest` handle one buffered
43
    /// request. A `SendRequest` handle is ready to send a request if it has no
44
    /// associated buffered requests. This is the same strategy as `mpsc` in the
45
    /// futures library.
46
    pending_open: store::Queue<stream::NextOpen>,
47
48
    /// Connection level flow control governing sent data
49
    flow: FlowControl,
50
51
    /// Stream ID of the last stream opened.
52
    last_opened_id: StreamId,
53
54
    /// What `DATA` frame is currently being sent in the codec.
55
    in_flight_data_frame: InFlightData,
56
57
    /// The maximum amount of bytes a stream should buffer.
58
    max_buffer_size: usize,
59
}
60
61
#[derive(Debug, Eq, PartialEq)]
62
enum InFlightData {
63
    /// There is no `DATA` frame in flight.
64
    Nothing,
65
    /// There is a `DATA` frame in flight belonging to the given stream.
66
    DataFrame(store::Key),
67
    /// There was a `DATA` frame, but the stream's queue was since cleared.
68
    Drop,
69
}
70
71
pub(crate) struct Prioritized<B> {
72
    // The buffer
73
    inner: Take<B>,
74
75
    end_of_stream: bool,
76
77
    // The stream that this is associated with
78
    stream: store::Key,
79
}
80
81
// ===== impl Prioritize =====
82
83
impl Prioritize {
84
12.7k
    pub fn new(config: &Config) -> Prioritize {
85
12.7k
        let mut flow = FlowControl::new();
86
12.7k
87
12.7k
        flow.inc_window(config.remote_init_window_sz)
88
12.7k
            .expect("invalid initial window size");
89
12.7k
90
12.7k
        // TODO: proper error handling
91
12.7k
        let _res = flow.assign_capacity(config.remote_init_window_sz);
92
12.7k
        debug_assert!(_res.is_ok());
93
94
12.7k
        tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96
12.7k
        Prioritize {
97
12.7k
            pending_send: store::Queue::new(),
98
12.7k
            pending_capacity: store::Queue::new(),
99
12.7k
            pending_open: store::Queue::new(),
100
12.7k
            flow,
101
12.7k
            last_opened_id: StreamId::ZERO,
102
12.7k
            in_flight_data_frame: InFlightData::Nothing,
103
12.7k
            max_buffer_size: config.local_max_buffer_size,
104
12.7k
        }
105
12.7k
    }
106
107
0
    pub(crate) fn max_buffer_size(&self) -> usize {
108
0
        self.max_buffer_size
109
0
    }
110
111
    /// Queue a frame to be sent to the remote
112
450k
    pub fn queue_frame<B>(
113
450k
        &mut self,
114
450k
        frame: Frame<B>,
115
450k
        buffer: &mut Buffer<Frame<B>>,
116
450k
        stream: &mut store::Ptr,
117
450k
        task: &mut Option<Waker>,
118
450k
    ) {
119
450k
        let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120
450k
        let _e = span.enter();
121
450k
        // Queue the frame in the buffer
122
450k
        stream.pending_send.push_back(buffer, frame);
123
450k
        self.schedule_send(stream, task);
124
450k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::queue_frame::<_>
<h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes>
Line
Count
Source
112
357
    pub fn queue_frame<B>(
113
357
        &mut self,
114
357
        frame: Frame<B>,
115
357
        buffer: &mut Buffer<Frame<B>>,
116
357
        stream: &mut store::Ptr,
117
357
        task: &mut Option<Waker>,
118
357
    ) {
119
357
        let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120
357
        let _e = span.enter();
121
357
        // Queue the frame in the buffer
122
357
        stream.pending_send.push_back(buffer, frame);
123
357
        self.schedule_send(stream, task);
124
357
    }
<h2::proto::streams::prioritize::Prioritize>::queue_frame::<bytes::bytes::Bytes>
Line
Count
Source
112
450k
    pub fn queue_frame<B>(
113
450k
        &mut self,
114
450k
        frame: Frame<B>,
115
450k
        buffer: &mut Buffer<Frame<B>>,
116
450k
        stream: &mut store::Ptr,
117
450k
        task: &mut Option<Waker>,
118
450k
    ) {
119
450k
        let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120
450k
        let _e = span.enter();
121
450k
        // Queue the frame in the buffer
122
450k
        stream.pending_send.push_back(buffer, frame);
123
450k
        self.schedule_send(stream, task);
124
450k
    }
125
126
533k
    pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127
533k
        // If the stream is waiting to be opened, nothing more to do.
128
533k
        if stream.is_send_ready() {
129
98.4k
            tracing::trace!(?stream.id, "schedule_send");
130
            // Queue the stream
131
98.4k
            self.pending_send.push(stream);
132
133
            // Notify the connection.
134
98.4k
            if let Some(task) = task.take() {
135
54
                task.wake();
136
98.4k
            }
137
435k
        }
138
533k
    }
139
140
426k
    pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141
426k
        self.pending_open.push(stream);
142
426k
    }
143
144
    /// Send a data frame
145
425k
    pub fn send_data<B>(
146
425k
        &mut self,
147
425k
        frame: frame::Data<B>,
148
425k
        buffer: &mut Buffer<Frame<B>>,
149
425k
        stream: &mut store::Ptr,
150
425k
        counts: &mut Counts,
151
425k
        task: &mut Option<Waker>,
152
425k
    ) -> Result<(), UserError>
153
425k
    where
154
425k
        B: Buf,
155
425k
    {
156
425k
        let sz = frame.payload().remaining();
157
425k
158
425k
        if sz > MAX_WINDOW_SIZE as usize {
159
0
            return Err(UserError::PayloadTooBig);
160
425k
        }
161
425k
162
425k
        let sz = sz as WindowSize;
163
425k
164
425k
        if !stream.state.is_send_streaming() {
165
0
            if stream.state.is_closed() {
166
0
                return Err(InactiveStreamId);
167
            } else {
168
0
                return Err(UnexpectedFrameType);
169
            }
170
425k
        }
171
425k
172
425k
        // Update the buffered data counter
173
425k
        stream.buffered_send_data += sz as usize;
174
175
425k
        let span =
176
425k
            tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177
425k
        let _e = span.enter();
178
425k
        tracing::trace!(buffered = stream.buffered_send_data);
179
180
        // Implicitly request more send capacity if not enough has been
181
        // requested yet.
182
425k
        if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183
            // Update the target requested capacity
184
425k
            stream.requested_send_capacity =
185
425k
                cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
425k
187
425k
            // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188
425k
            // cannot be assigned at the time it is called.
189
425k
            //
190
425k
            // Streams over the max concurrent count will still call `send_data` so we should be
191
425k
            // careful not to put it into `pending_capacity` as it will starve the connection
192
425k
            // capacity for other streams
193
425k
            if !stream.is_pending_open {
194
0
                self.try_assign_capacity(stream);
195
425k
            }
196
0
        }
197
198
425k
        if frame.is_end_stream() {
199
425k
            stream.state.send_close();
200
425k
            self.reserve_capacity(0, stream, counts);
201
425k
        }
202
203
425k
        tracing::trace!(
204
0
            available = %stream.send_flow.available(),
205
0
            buffered = stream.buffered_send_data,
206
        );
207
208
        // The `stream.buffered_send_data == 0` check is here so that, if a zero
209
        // length data frame is queued to the front (there is no previously
210
        // queued data), it gets sent out immediately even if there is no
211
        // available send window.
212
        //
213
        // Sending out zero length data frames can be done to signal
214
        // end-of-stream.
215
        //
216
425k
        if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217
0
            // The stream currently has capacity to send the data frame, so
218
0
            // queue it up and notify the connection task.
219
0
            self.queue_frame(frame.into(), buffer, stream, task);
220
425k
        } else {
221
425k
            // The stream has no capacity to send the frame now, save it but
222
425k
            // don't notify the connection task. Once additional capacity
223
425k
            // becomes available, the frame will be flushed.
224
425k
            stream.pending_send.push_back(buffer, frame.into());
225
425k
        }
226
227
425k
        Ok(())
228
425k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::send_data::<_>
<h2::proto::streams::prioritize::Prioritize>::send_data::<bytes::bytes::Bytes>
Line
Count
Source
145
425k
    pub fn send_data<B>(
146
425k
        &mut self,
147
425k
        frame: frame::Data<B>,
148
425k
        buffer: &mut Buffer<Frame<B>>,
149
425k
        stream: &mut store::Ptr,
150
425k
        counts: &mut Counts,
151
425k
        task: &mut Option<Waker>,
152
425k
    ) -> Result<(), UserError>
153
425k
    where
154
425k
        B: Buf,
155
425k
    {
156
425k
        let sz = frame.payload().remaining();
157
425k
158
425k
        if sz > MAX_WINDOW_SIZE as usize {
159
0
            return Err(UserError::PayloadTooBig);
160
425k
        }
161
425k
162
425k
        let sz = sz as WindowSize;
163
425k
164
425k
        if !stream.state.is_send_streaming() {
165
0
            if stream.state.is_closed() {
166
0
                return Err(InactiveStreamId);
167
            } else {
168
0
                return Err(UnexpectedFrameType);
169
            }
170
425k
        }
171
425k
172
425k
        // Update the buffered data counter
173
425k
        stream.buffered_send_data += sz as usize;
174
175
425k
        let span =
176
425k
            tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177
425k
        let _e = span.enter();
178
425k
        tracing::trace!(buffered = stream.buffered_send_data);
179
180
        // Implicitly request more send capacity if not enough has been
181
        // requested yet.
182
425k
        if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183
            // Update the target requested capacity
184
425k
            stream.requested_send_capacity =
185
425k
                cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
425k
187
425k
            // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188
425k
            // cannot be assigned at the time it is called.
189
425k
            //
190
425k
            // Streams over the max concurrent count will still call `send_data` so we should be
191
425k
            // careful not to put it into `pending_capacity` as it will starve the connection
192
425k
            // capacity for other streams
193
425k
            if !stream.is_pending_open {
194
0
                self.try_assign_capacity(stream);
195
425k
            }
196
0
        }
197
198
425k
        if frame.is_end_stream() {
199
425k
            stream.state.send_close();
200
425k
            self.reserve_capacity(0, stream, counts);
201
425k
        }
202
203
425k
        tracing::trace!(
204
0
            available = %stream.send_flow.available(),
205
0
            buffered = stream.buffered_send_data,
206
        );
207
208
        // The `stream.buffered_send_data == 0` check is here so that, if a zero
209
        // length data frame is queued to the front (there is no previously
210
        // queued data), it gets sent out immediately even if there is no
211
        // available send window.
212
        //
213
        // Sending out zero length data frames can be done to signal
214
        // end-of-stream.
215
        //
216
425k
        if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217
0
            // The stream currently has capacity to send the data frame, so
218
0
            // queue it up and notify the connection task.
219
0
            self.queue_frame(frame.into(), buffer, stream, task);
220
425k
        } else {
221
425k
            // The stream has no capacity to send the frame now, save it but
222
425k
            // don't notify the connection task. Once additional capacity
223
425k
            // becomes available, the frame will be flushed.
224
425k
            stream.pending_send.push_back(buffer, frame.into());
225
425k
        }
226
227
425k
        Ok(())
228
425k
    }
229
230
    /// Request capacity to send data
231
425k
    pub fn reserve_capacity(
232
425k
        &mut self,
233
425k
        capacity: WindowSize,
234
425k
        stream: &mut store::Ptr,
235
425k
        counts: &mut Counts,
236
425k
    ) {
237
425k
        let span = tracing::trace_span!(
238
            "reserve_capacity",
239
            ?stream.id,
240
            requested = capacity,
241
0
            effective = (capacity as usize) + stream.buffered_send_data,
242
0
            curr = stream.requested_send_capacity
243
        );
244
425k
        let _e = span.enter();
245
425k
246
425k
        // Actual capacity is `capacity` + the current amount of buffered data.
247
425k
        // If it were less, then we could never send out the buffered data.
248
425k
        let capacity = (capacity as usize) + stream.buffered_send_data;
249
425k
250
425k
        match capacity.cmp(&(stream.requested_send_capacity as usize)) {
251
425k
            Ordering::Equal => {
252
425k
                // Nothing to do
253
425k
            }
254
            Ordering::Less => {
255
                // Update the target requested capacity
256
0
                stream.requested_send_capacity = capacity as WindowSize;
257
0
258
0
                // Currently available capacity assigned to the stream
259
0
                let available = stream.send_flow.available().as_size();
260
0
261
0
                // If the stream has more assigned capacity than requested, reclaim
262
0
                // some for the connection
263
0
                if available as usize > capacity {
264
0
                    let diff = available - capacity as WindowSize;
265
0
266
0
                    // TODO: proper error handling
267
0
                    let _res = stream.send_flow.claim_capacity(diff);
268
0
                    debug_assert!(_res.is_ok());
269
270
0
                    self.assign_connection_capacity(diff, stream, counts);
271
0
                }
272
            }
273
            Ordering::Greater => {
274
                // If trying to *add* capacity, but the stream send side is closed,
275
                // there's nothing to be done.
276
0
                if stream.state.is_send_closed() {
277
0
                    return;
278
0
                }
279
0
280
0
                // Update the target requested capacity
281
0
                stream.requested_send_capacity =
282
0
                    cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
283
0
284
0
                // Try to assign additional capacity to the stream. If none is
285
0
                // currently available, the stream will be queued to receive some
286
0
                // when more becomes available.
287
0
                self.try_assign_capacity(stream);
288
            }
289
        }
290
425k
    }
291
292
126k
    pub fn recv_stream_window_update(
293
126k
        &mut self,
294
126k
        inc: WindowSize,
295
126k
        stream: &mut store::Ptr,
296
126k
    ) -> Result<(), Reason> {
297
126k
        let span = tracing::trace_span!(
298
            "recv_stream_window_update",
299
            ?stream.id,
300
            ?stream.state,
301
            inc,
302
0
            flow = ?stream.send_flow
303
        );
304
126k
        let _e = span.enter();
305
126k
306
126k
        if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
307
            // We can't send any data, so don't bother doing anything else.
308
4.38k
            return Ok(());
309
122k
        }
310
122k
311
122k
        // Update the stream level flow control.
312
122k
        stream.send_flow.inc_window(inc)?;
313
314
        // If the stream is waiting on additional capacity, then this will
315
        // assign it (if available on the connection) and notify the producer
316
122k
        self.try_assign_capacity(stream);
317
122k
318
122k
        Ok(())
319
126k
    }
320
321
738
    pub fn recv_connection_window_update(
322
738
        &mut self,
323
738
        inc: WindowSize,
324
738
        store: &mut Store,
325
738
        counts: &mut Counts,
326
738
    ) -> Result<(), Reason> {
327
738
        // Update the connection's window
328
738
        self.flow.inc_window(inc)?;
329
330
737
        self.assign_connection_capacity(inc, store, counts);
331
737
        Ok(())
332
738
    }
333
334
    /// Reclaim all capacity assigned to the stream and re-assign it to the
335
    /// connection
336
577k
    pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
337
577k
        let available = stream.send_flow.available().as_size();
338
577k
        if available > 0 {
339
            // TODO: proper error handling
340
49.8k
            let _res = stream.send_flow.claim_capacity(available);
341
49.8k
            debug_assert!(_res.is_ok());
342
            // Re-assign all capacity to the connection
343
49.8k
            self.assign_connection_capacity(available, stream, counts);
344
527k
        }
345
577k
    }
346
347
    /// Reclaim just reserved capacity, not buffered capacity, and re-assign
348
    /// it to the connection
349
82.7k
    pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350
82.7k
        // only reclaim reserved capacity that isn't already buffered
351
82.7k
        if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
352
0
            let reserved =
353
0
                stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;
354
0
355
0
            // Panic safety: due to how `reserved` is computed it can't be greater
356
0
            // than what's available.
357
0
            stream
358
0
                .send_flow
359
0
                .claim_capacity(reserved)
360
0
                .expect("window size should be greater than reserved");
361
0
362
0
            self.assign_connection_capacity(reserved, stream, counts);
363
82.7k
        }
364
82.7k
    }
365
366
17.4k
    pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
367
17.4k
        let span = tracing::trace_span!("clear_pending_capacity");
368
17.4k
        let _e = span.enter();
369
30.6k
        while let Some(stream) = self.pending_capacity.pop(store) {
370
13.1k
            counts.transition(stream, |_, stream| {
371
13.1k
                tracing::trace!(?stream.id, "clear_pending_capacity");
372
13.1k
            })
373
        }
374
17.4k
    }
375
376
52.4k
    pub fn assign_connection_capacity<R>(
377
52.4k
        &mut self,
378
52.4k
        inc: WindowSize,
379
52.4k
        store: &mut R,
380
52.4k
        counts: &mut Counts,
381
52.4k
    ) where
382
52.4k
        R: Resolve,
383
52.4k
    {
384
52.4k
        let span = tracing::trace_span!("assign_connection_capacity", inc);
385
52.4k
        let _e = span.enter();
386
52.4k
387
52.4k
        // TODO: proper error handling
388
52.4k
        let _res = self.flow.assign_capacity(inc);
389
52.4k
        debug_assert!(_res.is_ok());
390
391
        // Assign newly acquired capacity to streams pending capacity.
392
273k
        while self.flow.available() > 0 {
393
243k
            let stream = match self.pending_capacity.pop(store) {
394
221k
                Some(stream) => stream,
395
22.5k
                None => return,
396
            };
397
398
            // Streams pending capacity may have been reset before capacity
399
            // became available. In that case, the stream won't want any
400
            // capacity, and so we shouldn't "transition" on it, but just evict
401
            // it and continue the loop.
402
221k
            if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
403
133k
                continue;
404
87.3k
            }
405
87.3k
406
87.3k
            counts.transition(stream, |_, stream| {
407
87.3k
                // Try to assign capacity to the stream. This will also re-queue the
408
87.3k
                // stream if there isn't enough connection level capacity to fulfill
409
87.3k
                // the capacity request.
410
87.3k
                self.try_assign_capacity(stream);
411
87.3k
            })
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr>::{closure#0}
Line
Count
Source
406
30.6k
            counts.transition(stream, |_, stream| {
407
30.6k
                // Try to assign capacity to the stream. This will also re-queue the
408
30.6k
                // stream if there isn't enough connection level capacity to fulfill
409
30.6k
                // the capacity request.
410
30.6k
                self.try_assign_capacity(stream);
411
30.6k
            })
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store>::{closure#0}
Line
Count
Source
406
56.6k
            counts.transition(stream, |_, stream| {
407
56.6k
                // Try to assign capacity to the stream. This will also re-queue the
408
56.6k
                // stream if there isn't enough connection level capacity to fulfill
409
56.6k
                // the capacity request.
410
56.6k
                self.try_assign_capacity(stream);
411
56.6k
            })
412
        }
413
52.4k
    }
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Ptr>
Line
Count
Source
376
49.8k
    pub fn assign_connection_capacity<R>(
377
49.8k
        &mut self,
378
49.8k
        inc: WindowSize,
379
49.8k
        store: &mut R,
380
49.8k
        counts: &mut Counts,
381
49.8k
    ) where
382
49.8k
        R: Resolve,
383
49.8k
    {
384
49.8k
        let span = tracing::trace_span!("assign_connection_capacity", inc);
385
49.8k
        let _e = span.enter();
386
49.8k
387
49.8k
        // TODO: proper error handling
388
49.8k
        let _res = self.flow.assign_capacity(inc);
389
49.8k
        debug_assert!(_res.is_ok());
390
391
        // Assign newly acquired capacity to streams pending capacity.
392
213k
        while self.flow.available() > 0 {
393
184k
            let stream = match self.pending_capacity.pop(store) {
394
163k
                Some(stream) => stream,
395
20.7k
                None => return,
396
            };
397
398
            // Streams pending capacity may have been reset before capacity
399
            // became available. In that case, the stream won't want any
400
            // capacity, and so we shouldn't "transition" on it, but just evict
401
            // it and continue the loop.
402
163k
            if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
403
133k
                continue;
404
30.6k
            }
405
30.6k
406
30.6k
            counts.transition(stream, |_, stream| {
407
                // Try to assign capacity to the stream. This will also re-queue the
408
                // stream if there isn't enough connection level capacity to fulfill
409
                // the capacity request.
410
                self.try_assign_capacity(stream);
411
30.6k
            })
412
        }
413
49.8k
    }
<h2::proto::streams::prioritize::Prioritize>::assign_connection_capacity::<h2::proto::streams::store::Store>
Line
Count
Source
376
2.60k
    pub fn assign_connection_capacity<R>(
377
2.60k
        &mut self,
378
2.60k
        inc: WindowSize,
379
2.60k
        store: &mut R,
380
2.60k
        counts: &mut Counts,
381
2.60k
    ) where
382
2.60k
        R: Resolve,
383
2.60k
    {
384
2.60k
        let span = tracing::trace_span!("assign_connection_capacity", inc);
385
2.60k
        let _e = span.enter();
386
2.60k
387
2.60k
        // TODO: proper error handling
388
2.60k
        let _res = self.flow.assign_capacity(inc);
389
2.60k
        debug_assert!(_res.is_ok());
390
391
        // Assign newly acquired capacity to streams pending capacity.
392
59.8k
        while self.flow.available() > 0 {
393
58.9k
            let stream = match self.pending_capacity.pop(store) {
394
57.2k
                Some(stream) => stream,
395
1.74k
                None => return,
396
            };
397
398
            // Streams pending capacity may have been reset before capacity
399
            // became available. In that case, the stream won't want any
400
            // capacity, and so we shouldn't "transition" on it, but just evict
401
            // it and continue the loop.
402
57.2k
            if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
403
552
                continue;
404
56.6k
            }
405
56.6k
406
56.6k
            counts.transition(stream, |_, stream| {
407
                // Try to assign capacity to the stream. This will also re-queue the
408
                // stream if there isn't enough connection level capacity to fulfill
409
                // the capacity request.
410
                self.try_assign_capacity(stream);
411
56.6k
            })
412
        }
413
2.60k
    }
414
415
    /// Request capacity to send data
416
394k
    fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
417
394k
        let total_requested = stream.requested_send_capacity;
418
394k
419
394k
        // Total requested should never go below actual assigned
420
394k
        // (Note: the window size can go lower than assigned)
421
394k
        debug_assert!(stream.send_flow.available() <= total_requested as usize);
422
423
        // The amount of additional capacity that the stream requests.
424
        // Don't assign more than the window has available!
425
394k
        let additional = cmp::min(
426
394k
            total_requested - stream.send_flow.available().as_size(),
427
394k
            // Can't assign more than what is available
428
394k
            stream.send_flow.window_size() - stream.send_flow.available().as_size(),
429
394k
        );
430
394k
        let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
431
394k
        let _e = span.enter();
432
394k
        tracing::trace!(
433
            requested = total_requested,
434
            additional,
435
0
            buffered = stream.buffered_send_data,
436
0
            window = stream.send_flow.window_size(),
437
0
            conn = %self.flow.available()
438
        );
439
440
394k
        if additional == 0 {
441
            // Nothing more to do
442
39.4k
            return;
443
354k
        }
444
354k
445
354k
        // If the stream has requested capacity, then it must be in the
446
354k
        // streaming state (more data could be sent) or there is buffered data
447
354k
        // waiting to be sent.
448
354k
        debug_assert!(
449
0
            stream.state.is_send_streaming() || stream.buffered_send_data > 0,
450
0
            "state={:?}",
451
0
            stream.state
452
        );
453
454
        // The amount of currently available capacity on the connection
455
354k
        let conn_available = self.flow.available().as_size();
456
354k
457
354k
        // First check if capacity is immediately available
458
354k
        if conn_available > 0 {
459
            // The amount of capacity to assign to the stream
460
            // TODO: Should prioritization factor into this?
461
84.4k
            let assign = cmp::min(conn_available, additional);
462
84.4k
463
84.4k
            tracing::trace!(capacity = assign, "assigning");
464
465
            // Assign the capacity to the stream
466
84.4k
            stream.assign_capacity(assign, self.max_buffer_size);
467
84.4k
468
84.4k
            // Claim the capacity from the connection
469
84.4k
            // TODO: proper error handling
470
84.4k
            let _res = self.flow.claim_capacity(assign);
471
84.4k
            debug_assert!(_res.is_ok());
472
270k
        }
473
474
354k
        tracing::trace!(
475
0
            available = %stream.send_flow.available(),
476
0
            requested = stream.requested_send_capacity,
477
0
            buffered = stream.buffered_send_data,
478
0
            has_unavailable = %stream.send_flow.has_unavailable()
479
        );
480
481
354k
        if stream.send_flow.available() < stream.requested_send_capacity as usize
482
331k
            && stream.send_flow.has_unavailable()
483
292k
        {
484
292k
            // The stream requires additional capacity and the stream's
485
292k
            // window has available capacity, but the connection window
486
292k
            // does not.
487
292k
            //
488
292k
            // In this case, the stream needs to be queued up for when the
489
292k
            // connection has more capacity.
490
292k
            self.pending_capacity.push(stream);
491
292k
        }
492
493
        // If data is buffered and the stream is send ready, then
494
        // schedule the stream for execution
495
354k
        if stream.buffered_send_data > 0 && stream.is_send_ready() {
496
257k
            // TODO: This assertion isn't *exactly* correct. There can still be
497
257k
            // buffered send data while the stream's pending send queue is
498
257k
            // empty. This can happen when a large data frame is in the process
499
257k
            // of being **partially** sent. Once the window has been sent, the
500
257k
            // data frame will be returned to the prioritization layer to be
501
257k
            // re-scheduled.
502
257k
            //
503
257k
            // That said, it would be nice to figure out how to make this
504
257k
            // assertion correctly.
505
257k
            //
506
257k
            // debug_assert!(!stream.pending_send.is_empty());
507
257k
508
257k
            self.pending_send.push(stream);
509
257k
        }
510
394k
    }
511
512
381k
    pub fn poll_complete<T, B>(
513
381k
        &mut self,
514
381k
        cx: &mut Context,
515
381k
        buffer: &mut Buffer<Frame<B>>,
516
381k
        store: &mut Store,
517
381k
        counts: &mut Counts,
518
381k
        dst: &mut Codec<T, Prioritized<B>>,
519
381k
    ) -> Poll<io::Result<()>>
520
381k
    where
521
381k
        T: AsyncWrite + Unpin,
522
381k
        B: Buf,
523
381k
    {
524
        // Ensure codec is ready
525
381k
        ready!(dst.poll_ready(cx))?;
526
527
        // Reclaim any frame that has previously been written
528
381k
        self.reclaim_frame(buffer, store, dst);
529
381k
530
381k
        // The max frame length
531
381k
        let max_frame_len = dst.max_send_frame_size();
532
381k
533
381k
        tracing::trace!("poll_complete");
534
535
        loop {
536
584k
            if let Some(mut stream) = self.pop_pending_open(store, counts) {
537
184k
                self.pending_send.push_front(&mut stream);
538
184k
                self.try_assign_capacity(&mut stream);
539
400k
            }
540
541
584k
            match self.pop_frame(buffer, store, max_frame_len, counts) {
542
205k
                Some(frame) => {
543
205k
                    tracing::trace!(?frame, "writing");
544
545
205k
                    debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
546
205k
                    if let Frame::Data(ref frame) = frame {
547
19.1k
                        self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
548
185k
                    }
549
205k
                    dst.buffer(frame).expect("invalid frame");
550
551
                    // Ensure the codec is ready to try the loop again.
552
205k
                    ready!(dst.poll_ready(cx))?;
553
554
                    // Because, always try to reclaim...
555
202k
                    self.reclaim_frame(buffer, store, dst);
556
                }
557
                None => {
558
                    // Try to flush the codec.
559
379k
                    ready!(dst.flush(cx))?;
560
561
                    // This might release a data frame...
562
345k
                    if !self.reclaim_frame(buffer, store, dst) {
563
345k
                        return Poll::Ready(Ok(()));
564
0
                    }
565
566
                    // No need to poll ready as poll_complete() does this for
567
                    // us...
568
                }
569
            }
570
        }
571
381k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::poll_complete::<_, _>
<h2::proto::streams::prioritize::Prioritize>::poll_complete::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
512
381k
    pub fn poll_complete<T, B>(
513
381k
        &mut self,
514
381k
        cx: &mut Context,
515
381k
        buffer: &mut Buffer<Frame<B>>,
516
381k
        store: &mut Store,
517
381k
        counts: &mut Counts,
518
381k
        dst: &mut Codec<T, Prioritized<B>>,
519
381k
    ) -> Poll<io::Result<()>>
520
381k
    where
521
381k
        T: AsyncWrite + Unpin,
522
381k
        B: Buf,
523
381k
    {
524
        // Ensure codec is ready
525
381k
        ready!(dst.poll_ready(cx))?;
526
527
        // Reclaim any frame that has previously been written
528
381k
        self.reclaim_frame(buffer, store, dst);
529
381k
530
381k
        // The max frame length
531
381k
        let max_frame_len = dst.max_send_frame_size();
532
381k
533
381k
        tracing::trace!("poll_complete");
534
535
        loop {
536
584k
            if let Some(mut stream) = self.pop_pending_open(store, counts) {
537
184k
                self.pending_send.push_front(&mut stream);
538
184k
                self.try_assign_capacity(&mut stream);
539
400k
            }
540
541
584k
            match self.pop_frame(buffer, store, max_frame_len, counts) {
542
205k
                Some(frame) => {
543
205k
                    tracing::trace!(?frame, "writing");
544
545
205k
                    debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
546
205k
                    if let Frame::Data(ref frame) = frame {
547
19.1k
                        self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
548
185k
                    }
549
205k
                    dst.buffer(frame).expect("invalid frame");
550
551
                    // Ensure the codec is ready to try the loop again.
552
205k
                    ready!(dst.poll_ready(cx))?;
553
554
                    // Because, always try to reclaim...
555
202k
                    self.reclaim_frame(buffer, store, dst);
556
                }
557
                None => {
558
                    // Try to flush the codec.
559
379k
                    ready!(dst.flush(cx))?;
560
561
                    // This might release a data frame...
562
345k
                    if !self.reclaim_frame(buffer, store, dst) {
563
345k
                        return Poll::Ready(Ok(()));
564
0
                    }
565
566
                    // No need to poll ready as poll_complete() does this for
567
                    // us...
568
                }
569
            }
570
        }
571
381k
    }
572
573
    /// Tries to reclaim a pending data frame from the codec.
574
    ///
575
    /// Returns true if a frame was reclaimed.
576
    ///
577
    /// When a data frame is written to the codec, it may not be written in its
578
    /// entirety (large chunks are split up into potentially many data frames).
579
    /// In this case, the stream needs to be reprioritized.
580
929k
    fn reclaim_frame<T, B>(
581
929k
        &mut self,
582
929k
        buffer: &mut Buffer<Frame<B>>,
583
929k
        store: &mut Store,
584
929k
        dst: &mut Codec<T, Prioritized<B>>,
585
929k
    ) -> bool
586
929k
    where
587
929k
        B: Buf,
588
929k
    {
589
929k
        let span = tracing::trace_span!("try_reclaim_frame");
590
929k
        let _e = span.enter();
591
592
        // First check if there are any data chunks to take back
593
929k
        if let Some(frame) = dst.take_last_data_frame() {
594
17.8k
            self.reclaim_frame_inner(buffer, store, frame)
595
        } else {
596
912k
            false
597
        }
598
929k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<_, _>
<h2::proto::streams::prioritize::Prioritize>::reclaim_frame::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
580
929k
    fn reclaim_frame<T, B>(
581
929k
        &mut self,
582
929k
        buffer: &mut Buffer<Frame<B>>,
583
929k
        store: &mut Store,
584
929k
        dst: &mut Codec<T, Prioritized<B>>,
585
929k
    ) -> bool
586
929k
    where
587
929k
        B: Buf,
588
929k
    {
589
929k
        let span = tracing::trace_span!("try_reclaim_frame");
590
929k
        let _e = span.enter();
591
592
        // First check if there are any data chunks to take back
593
929k
        if let Some(frame) = dst.take_last_data_frame() {
594
17.8k
            self.reclaim_frame_inner(buffer, store, frame)
595
        } else {
596
912k
            false
597
        }
598
929k
    }
599
600
17.8k
    fn reclaim_frame_inner<B>(
601
17.8k
        &mut self,
602
17.8k
        buffer: &mut Buffer<Frame<B>>,
603
17.8k
        store: &mut Store,
604
17.8k
        frame: frame::Data<Prioritized<B>>,
605
17.8k
    ) -> bool
606
17.8k
    where
607
17.8k
        B: Buf,
608
17.8k
    {
609
17.8k
        tracing::trace!(
610
            ?frame,
611
0
            sz = frame.payload().inner.get_ref().remaining(),
612
0
            "reclaimed"
613
        );
614
615
17.8k
        let mut eos = false;
616
17.8k
        let key = frame.payload().stream;
617
17.8k
618
17.8k
        match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
619
0
            InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
620
            InFlightData::Drop => {
621
39
                tracing::trace!("not reclaiming frame for cancelled stream");
622
39
                return false;
623
            }
624
17.7k
            InFlightData::DataFrame(k) => {
625
17.7k
                debug_assert_eq!(k, key);
626
            }
627
        }
628
629
17.7k
        let mut frame = frame.map(|prioritized| {
630
17.7k
            // TODO: Ensure fully written
631
17.7k
            eos = prioritized.end_of_stream;
632
17.7k
            prioritized.inner.into_inner()
633
17.7k
        });
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_>::{closure#0}
<h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
629
17.7k
        let mut frame = frame.map(|prioritized| {
630
17.7k
            // TODO: Ensure fully written
631
17.7k
            eos = prioritized.end_of_stream;
632
17.7k
            prioritized.inner.into_inner()
633
17.7k
        });
634
17.7k
635
17.7k
        if frame.payload().has_remaining() {
636
17.1k
            let mut stream = store.resolve(key);
637
17.1k
638
17.1k
            if eos {
639
17.1k
                frame.set_end_stream(true);
640
17.1k
            }
641
642
17.1k
            self.push_back_frame(frame.into(), buffer, &mut stream);
643
17.1k
644
17.1k
            return true;
645
603
        }
646
603
647
603
        false
648
17.8k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<_>
<h2::proto::streams::prioritize::Prioritize>::reclaim_frame_inner::<bytes::bytes::Bytes>
Line
Count
Source
600
17.8k
    fn reclaim_frame_inner<B>(
601
17.8k
        &mut self,
602
17.8k
        buffer: &mut Buffer<Frame<B>>,
603
17.8k
        store: &mut Store,
604
17.8k
        frame: frame::Data<Prioritized<B>>,
605
17.8k
    ) -> bool
606
17.8k
    where
607
17.8k
        B: Buf,
608
17.8k
    {
609
17.8k
        tracing::trace!(
610
            ?frame,
611
0
            sz = frame.payload().inner.get_ref().remaining(),
612
0
            "reclaimed"
613
        );
614
615
17.8k
        let mut eos = false;
616
17.8k
        let key = frame.payload().stream;
617
17.8k
618
17.8k
        match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
619
0
            InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
620
            InFlightData::Drop => {
621
39
                tracing::trace!("not reclaiming frame for cancelled stream");
622
39
                return false;
623
            }
624
17.7k
            InFlightData::DataFrame(k) => {
625
17.7k
                debug_assert_eq!(k, key);
626
            }
627
        }
628
629
17.7k
        let mut frame = frame.map(|prioritized| {
630
            // TODO: Ensure fully written
631
            eos = prioritized.end_of_stream;
632
            prioritized.inner.into_inner()
633
17.7k
        });
634
17.7k
635
17.7k
        if frame.payload().has_remaining() {
636
17.1k
            let mut stream = store.resolve(key);
637
17.1k
638
17.1k
            if eos {
639
17.1k
                frame.set_end_stream(true);
640
17.1k
            }
641
642
17.1k
            self.push_back_frame(frame.into(), buffer, &mut stream);
643
17.1k
644
17.1k
            return true;
645
603
        }
646
603
647
603
        false
648
17.8k
    }
649
650
    /// Push the frame to the front of the stream's deque, scheduling the
651
    /// stream if needed.
652
17.1k
    fn push_back_frame<B>(
653
17.1k
        &mut self,
654
17.1k
        frame: Frame<B>,
655
17.1k
        buffer: &mut Buffer<Frame<B>>,
656
17.1k
        stream: &mut store::Ptr,
657
17.1k
    ) {
658
17.1k
        // Push the frame to the front of the stream's deque
659
17.1k
        stream.pending_send.push_front(buffer, frame);
660
17.1k
661
17.1k
        // If needed, schedule the sender
662
17.1k
        if stream.send_flow.available() > 0 {
663
2.43k
            debug_assert!(!stream.pending_send.is_empty());
664
2.43k
            self.pending_send.push(stream);
665
14.7k
        }
666
17.1k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::push_back_frame::<_>
<h2::proto::streams::prioritize::Prioritize>::push_back_frame::<bytes::bytes::Bytes>
Line
Count
Source
652
17.1k
    fn push_back_frame<B>(
653
17.1k
        &mut self,
654
17.1k
        frame: Frame<B>,
655
17.1k
        buffer: &mut Buffer<Frame<B>>,
656
17.1k
        stream: &mut store::Ptr,
657
17.1k
    ) {
658
17.1k
        // Push the frame to the front of the stream's deque
659
17.1k
        stream.pending_send.push_front(buffer, frame);
660
17.1k
661
17.1k
        // If needed, schedule the sender
662
17.1k
        if stream.send_flow.available() > 0 {
663
2.43k
            debug_assert!(!stream.pending_send.is_empty());
664
2.43k
            self.pending_send.push(stream);
665
14.7k
        }
666
17.1k
    }
667
668
577k
    pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
669
577k
        let span = tracing::trace_span!("clear_queue", ?stream.id);
670
577k
        let _e = span.enter();
671
672
        // TODO: make this more efficient?
673
1.26M
        while let Some(frame) = stream.pending_send.pop_front(buffer) {
674
688k
            tracing::trace!(?frame, "dropping");
675
        }
676
677
577k
        stream.buffered_send_data = 0;
678
577k
        stream.requested_send_capacity = 0;
679
577k
        if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
680
33.7k
            if stream.key() == key {
681
1.32k
                // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
682
1.32k
                self.in_flight_data_frame = InFlightData::Drop;
683
32.4k
            }
684
543k
        }
685
577k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::clear_queue::<_>
<h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes>
Line
Count
Source
668
357
    pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
669
357
        let span = tracing::trace_span!("clear_queue", ?stream.id);
670
357
        let _e = span.enter();
671
672
        // TODO: make this more efficient?
673
714
        while let Some(frame) = stream.pending_send.pop_front(buffer) {
674
357
            tracing::trace!(?frame, "dropping");
675
        }
676
677
357
        stream.buffered_send_data = 0;
678
357
        stream.requested_send_capacity = 0;
679
357
        if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
680
0
            if stream.key() == key {
681
0
                // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
682
0
                self.in_flight_data_frame = InFlightData::Drop;
683
0
            }
684
357
        }
685
357
    }
<h2::proto::streams::prioritize::Prioritize>::clear_queue::<bytes::bytes::Bytes>
Line
Count
Source
668
577k
    pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
669
577k
        let span = tracing::trace_span!("clear_queue", ?stream.id);
670
577k
        let _e = span.enter();
671
672
        // TODO: make this more efficient?
673
1.26M
        while let Some(frame) = stream.pending_send.pop_front(buffer) {
674
688k
            tracing::trace!(?frame, "dropping");
675
        }
676
677
577k
        stream.buffered_send_data = 0;
678
577k
        stream.requested_send_capacity = 0;
679
577k
        if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
680
33.7k
            if stream.key() == key {
681
1.32k
                // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
682
1.32k
                self.in_flight_data_frame = InFlightData::Drop;
683
32.4k
            }
684
543k
        }
685
577k
    }
686
687
17.4k
    pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
688
193k
        while let Some(mut stream) = self.pending_send.pop(store) {
689
176k
            let is_pending_reset = stream.is_pending_reset_expiration();
690
176k
            if let Some(reason) = stream.state.get_scheduled_reset() {
691
73.8k
                stream.set_reset(reason, Initiator::Library);
692
102k
            }
693
176k
            counts.transition_after(stream, is_pending_reset);
694
        }
695
17.4k
    }
696
697
17.4k
    pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
698
258k
        while let Some(stream) = self.pending_open.pop(store) {
699
241k
            let is_pending_reset = stream.is_pending_reset_expiration();
700
241k
            counts.transition_after(stream, is_pending_reset);
701
241k
        }
702
17.4k
    }
703
704
584k
    fn pop_frame<B>(
705
584k
        &mut self,
706
584k
        buffer: &mut Buffer<Frame<B>>,
707
584k
        store: &mut Store,
708
584k
        max_len: usize,
709
584k
        counts: &mut Counts,
710
584k
    ) -> Option<Frame<Prioritized<B>>>
711
584k
    where
712
584k
        B: Buf,
713
584k
    {
714
584k
        let span = tracing::trace_span!("pop_frame");
715
584k
        let _e = span.enter();
716
717
        loop {
718
620k
            match self.pending_send.pop(store) {
719
240k
                Some(mut stream) => {
720
240k
                    let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
721
240k
                    let _e = span.enter();
722
240k
723
240k
                    // It's possible that this stream, besides having data to send,
724
240k
                    // is also queued to send a reset, and thus is already in the queue
725
240k
                    // to wait for "some time" after a reset.
726
240k
                    //
727
240k
                    // To be safe, we just always ask the stream.
728
240k
                    let is_pending_reset = stream.is_pending_reset_expiration();
729
240k
730
240k
                    tracing::trace!(is_pending_reset);
731
732
240k
                    let frame = match stream.pending_send.pop_front(buffer) {
733
46.5k
                        Some(Frame::Data(mut frame)) => {
734
46.5k
                            // Get the amount of capacity remaining for stream's
735
46.5k
                            // window.
736
46.5k
                            let stream_capacity = stream.send_flow.available();
737
46.5k
                            let sz = frame.payload().remaining();
738
46.5k
739
46.5k
                            tracing::trace!(
740
                                sz,
741
0
                                eos = frame.is_end_stream(),
742
0
                                window = %stream_capacity,
743
0
                                available = %stream.send_flow.available(),
744
0
                                requested = stream.requested_send_capacity,
745
0
                                buffered = stream.buffered_send_data,
746
0
                                "data frame"
747
                            );
748
749
                            // Zero length data frames always have capacity to
750
                            // be sent.
751
46.5k
                            if sz > 0 && stream_capacity == 0 {
752
27.4k
                                tracing::trace!("stream capacity is 0");
753
754
                                // Ensure that the stream is waiting for
755
                                // connection level capacity
756
                                //
757
                                // TODO: uncomment
758
                                // debug_assert!(stream.is_pending_send_capacity);
759
760
                                // The stream has no more capacity, this can
761
                                // happen if the remote reduced the stream
762
                                // window. In this case, we need to buffer the
763
                                // frame and wait for a window update...
764
27.4k
                                stream.pending_send.push_front(buffer, frame.into());
765
27.4k
766
27.4k
                                continue;
767
19.1k
                            }
768
19.1k
769
19.1k
                            // Only send up to the max frame length
770
19.1k
                            let len = cmp::min(sz, max_len);
771
19.1k
772
19.1k
                            // Only send up to the stream's window capacity
773
19.1k
                            let len =
774
19.1k
                                cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
775
19.1k
776
19.1k
                            // There *must* be be enough connection level
777
19.1k
                            // capacity at this point.
778
19.1k
                            debug_assert!(len <= self.flow.window_size());
779
780
                            // Check if the stream level window the peer knows is available. In some
781
                            // scenarios, maybe the window we know is available but the window which
782
                            // peer knows is not.
783
19.1k
                            if len > 0 && len > stream.send_flow.window_size() {
784
0
                                stream.pending_send.push_front(buffer, frame.into());
785
0
                                continue;
786
19.1k
                            }
787
19.1k
788
19.1k
                            tracing::trace!(len, "sending data frame");
789
790
                            // Update the flow control
791
19.1k
                            tracing::trace_span!("updating stream flow").in_scope(|| {
792
19.1k
                                stream.send_data(len, self.max_buffer_size);
793
19.1k
794
19.1k
                                // Assign the capacity back to the connection that
795
19.1k
                                // was just consumed from the stream in the previous
796
19.1k
                                // line.
797
19.1k
                                // TODO: proper error handling
798
19.1k
                                let _res = self.flow.assign_capacity(len);
799
19.1k
                                debug_assert!(_res.is_ok());
800
19.1k
                            });
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#0}
<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
791
19.1k
                            tracing::trace_span!("updating stream flow").in_scope(|| {
792
19.1k
                                stream.send_data(len, self.max_buffer_size);
793
19.1k
794
19.1k
                                // Assign the capacity back to the connection that
795
19.1k
                                // was just consumed from the stream in the previous
796
19.1k
                                // line.
797
19.1k
                                // TODO: proper error handling
798
19.1k
                                let _res = self.flow.assign_capacity(len);
799
19.1k
                                debug_assert!(_res.is_ok());
800
19.1k
                            });
801
802
19.1k
                            let (eos, len) = tracing::trace_span!("updating connection flow")
803
19.1k
                                .in_scope(|| {
804
19.1k
                                    // TODO: proper error handling
805
19.1k
                                    let _res = self.flow.send_data(len);
806
19.1k
                                    debug_assert!(_res.is_ok());
807
808
                                    // Wrap the frame's data payload to ensure that the
809
                                    // correct amount of data gets written.
810
811
19.1k
                                    let eos = frame.is_end_stream();
812
19.1k
                                    let len = len as usize;
813
19.1k
814
19.1k
                                    if frame.payload().remaining() > len {
815
18.5k
                                        frame.set_end_stream(false);
816
18.5k
                                    }
817
19.1k
                                    (eos, len)
818
19.1k
                                });
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#1}
<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#1}
Line
Count
Source
803
19.1k
                                .in_scope(|| {
804
19.1k
                                    // TODO: proper error handling
805
19.1k
                                    let _res = self.flow.send_data(len);
806
19.1k
                                    debug_assert!(_res.is_ok());
807
808
                                    // Wrap the frame's data payload to ensure that the
809
                                    // correct amount of data gets written.
810
811
19.1k
                                    let eos = frame.is_end_stream();
812
19.1k
                                    let len = len as usize;
813
19.1k
814
19.1k
                                    if frame.payload().remaining() > len {
815
18.5k
                                        frame.set_end_stream(false);
816
18.5k
                                    }
817
19.1k
                                    (eos, len)
818
19.1k
                                });
819
19.1k
820
19.1k
                            Frame::Data(frame.map(|buf| Prioritized {
821
19.1k
                                inner: buf.take(len),
822
19.1k
                                end_of_stream: eos,
823
19.1k
                                stream: stream.key(),
824
19.1k
                            }))
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#2}
<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#2}
Line
Count
Source
820
19.1k
                            Frame::Data(frame.map(|buf| Prioritized {
821
19.1k
                                inner: buf.take(len),
822
19.1k
                                end_of_stream: eos,
823
19.1k
                                stream: stream.key(),
824
19.1k
                            }))
825
                        }
826
0
                        Some(Frame::PushPromise(pp)) => {
827
0
                            let mut pushed =
828
0
                                stream.store_mut().find_mut(&pp.promised_id()).unwrap();
829
0
                            pushed.is_pending_push = false;
830
0
                            // Transition stream from pending_push to pending_open
831
0
                            // if possible
832
0
                            if !pushed.pending_send.is_empty() {
833
0
                                if counts.can_inc_num_send_streams() {
834
0
                                    counts.inc_num_send_streams(&mut pushed);
835
0
                                    self.pending_send.push(&mut pushed);
836
0
                                } else {
837
0
                                    self.queue_open(&mut pushed);
838
0
                                }
839
0
                            }
840
0
                            Frame::PushPromise(pp)
841
                        }
842
185k
                        Some(frame) => frame.map(|_| {
843
0
                            unreachable!(
844
0
                                "Frame::map closure will only be called \
845
0
                                 on DATA frames."
846
0
                            )
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>::{closure#3}
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>::{closure#3}
847
185k
                        }),
848
                        None => {
849
8.40k
                            if let Some(reason) = stream.state.get_scheduled_reset() {
850
100
                                stream.set_reset(reason, Initiator::Library);
851
100
852
100
                                let frame = frame::Reset::new(stream.id, reason);
853
100
                                Frame::Reset(frame)
854
                            } else {
855
                                // If the stream receives a RESET from the peer, it may have
856
                                // had data buffered to be sent, but all the frames are cleared
857
                                // in clear_queue(). Instead of doing O(N) traversal through queue
858
                                // to remove, lets just ignore the stream here.
859
8.30k
                                tracing::trace!("removing dangling stream from pending_send");
860
                                // Since this should only happen as a consequence of `clear_queue`,
861
                                // we must be in a closed state of some kind.
862
8.30k
                                debug_assert!(stream.state.is_closed());
863
8.30k
                                counts.transition_after(stream, is_pending_reset);
864
8.30k
                                continue;
865
                            }
866
                        }
867
                    };
868
869
205k
                    tracing::trace!("pop_frame; frame={:?}", frame);
870
871
205k
                    if cfg!(debug_assertions) && stream.state.is_idle() {
872
0
                        debug_assert!(stream.id > self.last_opened_id);
873
0
                        self.last_opened_id = stream.id;
874
205k
                    }
875
876
205k
                    if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
877
184k
                        // TODO: Only requeue the sender IF it is ready to send
878
184k
                        // the next frame. i.e. don't requeue it if the next
879
184k
                        // frame is a data frame and the stream does not have
880
184k
                        // any more capacity.
881
184k
                        self.pending_send.push(&mut stream);
882
184k
                    }
883
884
205k
                    counts.transition_after(stream, is_pending_reset);
885
205k
886
205k
                    return Some(frame);
887
                }
888
379k
                None => return None,
889
            }
890
        }
891
584k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritize>::pop_frame::<_>
<h2::proto::streams::prioritize::Prioritize>::pop_frame::<bytes::bytes::Bytes>
Line
Count
Source
704
584k
    fn pop_frame<B>(
705
584k
        &mut self,
706
584k
        buffer: &mut Buffer<Frame<B>>,
707
584k
        store: &mut Store,
708
584k
        max_len: usize,
709
584k
        counts: &mut Counts,
710
584k
    ) -> Option<Frame<Prioritized<B>>>
711
584k
    where
712
584k
        B: Buf,
713
584k
    {
714
584k
        let span = tracing::trace_span!("pop_frame");
715
584k
        let _e = span.enter();
716
717
        loop {
718
620k
            match self.pending_send.pop(store) {
719
240k
                Some(mut stream) => {
720
240k
                    let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
721
240k
                    let _e = span.enter();
722
240k
723
240k
                    // It's possible that this stream, besides having data to send,
724
240k
                    // is also queued to send a reset, and thus is already in the queue
725
240k
                    // to wait for "some time" after a reset.
726
240k
                    //
727
240k
                    // To be safe, we just always ask the stream.
728
240k
                    let is_pending_reset = stream.is_pending_reset_expiration();
729
240k
730
240k
                    tracing::trace!(is_pending_reset);
731
732
240k
                    let frame = match stream.pending_send.pop_front(buffer) {
733
46.5k
                        Some(Frame::Data(mut frame)) => {
734
46.5k
                            // Get the amount of capacity remaining for stream's
735
46.5k
                            // window.
736
46.5k
                            let stream_capacity = stream.send_flow.available();
737
46.5k
                            let sz = frame.payload().remaining();
738
46.5k
739
46.5k
                            tracing::trace!(
740
                                sz,
741
0
                                eos = frame.is_end_stream(),
742
0
                                window = %stream_capacity,
743
0
                                available = %stream.send_flow.available(),
744
0
                                requested = stream.requested_send_capacity,
745
0
                                buffered = stream.buffered_send_data,
746
0
                                "data frame"
747
                            );
748
749
                            // Zero length data frames always have capacity to
750
                            // be sent.
751
46.5k
                            if sz > 0 && stream_capacity == 0 {
752
27.4k
                                tracing::trace!("stream capacity is 0");
753
754
                                // Ensure that the stream is waiting for
755
                                // connection level capacity
756
                                //
757
                                // TODO: uncomment
758
                                // debug_assert!(stream.is_pending_send_capacity);
759
760
                                // The stream has no more capacity, this can
761
                                // happen if the remote reduced the stream
762
                                // window. In this case, we need to buffer the
763
                                // frame and wait for a window update...
764
27.4k
                                stream.pending_send.push_front(buffer, frame.into());
765
27.4k
766
27.4k
                                continue;
767
19.1k
                            }
768
19.1k
769
19.1k
                            // Only send up to the max frame length
770
19.1k
                            let len = cmp::min(sz, max_len);
771
19.1k
772
19.1k
                            // Only send up to the stream's window capacity
773
19.1k
                            let len =
774
19.1k
                                cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
775
19.1k
776
19.1k
                            // There *must* be be enough connection level
777
19.1k
                            // capacity at this point.
778
19.1k
                            debug_assert!(len <= self.flow.window_size());
779
780
                            // Check if the stream level window the peer knows is available. In some
781
                            // scenarios, maybe the window we know is available but the window which
782
                            // peer knows is not.
783
19.1k
                            if len > 0 && len > stream.send_flow.window_size() {
784
0
                                stream.pending_send.push_front(buffer, frame.into());
785
0
                                continue;
786
19.1k
                            }
787
19.1k
788
19.1k
                            tracing::trace!(len, "sending data frame");
789
790
                            // Update the flow control
791
19.1k
                            tracing::trace_span!("updating stream flow").in_scope(|| {
792
                                stream.send_data(len, self.max_buffer_size);
793
794
                                // Assign the capacity back to the connection that
795
                                // was just consumed from the stream in the previous
796
                                // line.
797
                                // TODO: proper error handling
798
                                let _res = self.flow.assign_capacity(len);
799
                                debug_assert!(_res.is_ok());
800
19.1k
                            });
801
802
19.1k
                            let (eos, len) = tracing::trace_span!("updating connection flow")
803
19.1k
                                .in_scope(|| {
804
                                    // TODO: proper error handling
805
                                    let _res = self.flow.send_data(len);
806
                                    debug_assert!(_res.is_ok());
807
808
                                    // Wrap the frame's data payload to ensure that the
809
                                    // correct amount of data gets written.
810
811
                                    let eos = frame.is_end_stream();
812
                                    let len = len as usize;
813
814
                                    if frame.payload().remaining() > len {
815
                                        frame.set_end_stream(false);
816
                                    }
817
                                    (eos, len)
818
19.1k
                                });
819
19.1k
820
19.1k
                            Frame::Data(frame.map(|buf| Prioritized {
821
                                inner: buf.take(len),
822
                                end_of_stream: eos,
823
                                stream: stream.key(),
824
19.1k
                            }))
825
                        }
826
0
                        Some(Frame::PushPromise(pp)) => {
827
0
                            let mut pushed =
828
0
                                stream.store_mut().find_mut(&pp.promised_id()).unwrap();
829
0
                            pushed.is_pending_push = false;
830
0
                            // Transition stream from pending_push to pending_open
831
0
                            // if possible
832
0
                            if !pushed.pending_send.is_empty() {
833
0
                                if counts.can_inc_num_send_streams() {
834
0
                                    counts.inc_num_send_streams(&mut pushed);
835
0
                                    self.pending_send.push(&mut pushed);
836
0
                                } else {
837
0
                                    self.queue_open(&mut pushed);
838
0
                                }
839
0
                            }
840
0
                            Frame::PushPromise(pp)
841
                        }
842
185k
                        Some(frame) => frame.map(|_| {
843
                            unreachable!(
844
                                "Frame::map closure will only be called \
845
                                 on DATA frames."
846
                            )
847
185k
                        }),
848
                        None => {
849
8.40k
                            if let Some(reason) = stream.state.get_scheduled_reset() {
850
100
                                stream.set_reset(reason, Initiator::Library);
851
100
852
100
                                let frame = frame::Reset::new(stream.id, reason);
853
100
                                Frame::Reset(frame)
854
                            } else {
855
                                // If the stream receives a RESET from the peer, it may have
856
                                // had data buffered to be sent, but all the frames are cleared
857
                                // in clear_queue(). Instead of doing O(N) traversal through queue
858
                                // to remove, lets just ignore the stream here.
859
8.30k
                                tracing::trace!("removing dangling stream from pending_send");
860
                                // Since this should only happen as a consequence of `clear_queue`,
861
                                // we must be in a closed state of some kind.
862
8.30k
                                debug_assert!(stream.state.is_closed());
863
8.30k
                                counts.transition_after(stream, is_pending_reset);
864
8.30k
                                continue;
865
                            }
866
                        }
867
                    };
868
869
205k
                    tracing::trace!("pop_frame; frame={:?}", frame);
870
871
205k
                    if cfg!(debug_assertions) && stream.state.is_idle() {
872
0
                        debug_assert!(stream.id > self.last_opened_id);
873
0
                        self.last_opened_id = stream.id;
874
205k
                    }
875
876
205k
                    if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
877
184k
                        // TODO: Only requeue the sender IF it is ready to send
878
184k
                        // the next frame. i.e. don't requeue it if the next
879
184k
                        // frame is a data frame and the stream does not have
880
184k
                        // any more capacity.
881
184k
                        self.pending_send.push(&mut stream);
882
184k
                    }
883
884
205k
                    counts.transition_after(stream, is_pending_reset);
885
205k
886
205k
                    return Some(frame);
887
                }
888
379k
                None => return None,
889
            }
890
        }
891
584k
    }
892
893
584k
    fn pop_pending_open<'s>(
894
584k
        &mut self,
895
584k
        store: &'s mut Store,
896
584k
        counts: &mut Counts,
897
584k
    ) -> Option<store::Ptr<'s>> {
898
584k
        tracing::trace!("schedule_pending_open");
899
        // check for any pending open streams
900
584k
        if counts.can_inc_num_send_streams() {
901
568k
            if let Some(mut stream) = self.pending_open.pop(store) {
902
184k
                tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
903
904
184k
                counts.inc_num_send_streams(&mut stream);
905
184k
                stream.notify_send();
906
184k
                return Some(stream);
907
384k
            }
908
16.0k
        }
909
910
400k
        None
911
584k
    }
912
}
913
914
// ===== impl Prioritized =====
915
916
impl<B> Buf for Prioritized<B>
917
where
918
    B: Buf,
919
{
920
2.72M
    fn remaining(&self) -> usize {
921
2.72M
        self.inner.remaining()
922
2.72M
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::remaining
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining
<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::remaining
Line
Count
Source
920
2.72M
    fn remaining(&self) -> usize {
921
2.72M
        self.inner.remaining()
922
2.72M
    }
923
924
513k
    fn chunk(&self) -> &[u8] {
925
513k
        self.inner.chunk()
926
513k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunk
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk
<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunk
Line
Count
Source
924
513k
    fn chunk(&self) -> &[u8] {
925
513k
        self.inner.chunk()
926
513k
    }
927
928
0
    fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
929
0
        self.inner.chunks_vectored(dst)
930
0
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::chunks_vectored
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::chunks_vectored
931
932
21.9k
    fn advance(&mut self, cnt: usize) {
933
21.9k
        self.inner.advance(cnt)
934
21.9k
    }
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<_> as bytes::buf::buf_impl::Buf>::advance
Unexecuted instantiation: <h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance
<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes> as bytes::buf::buf_impl::Buf>::advance
Line
Count
Source
932
21.9k
    fn advance(&mut self, cnt: usize) {
933
21.9k
        self.inner.advance(cnt)
934
21.9k
    }
935
}
936
937
impl<B: Buf> fmt::Debug for Prioritized<B> {
938
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
939
0
        fmt.debug_struct("Prioritized")
940
0
            .field("remaining", &self.inner.get_ref().remaining())
941
0
            .field("end_of_stream", &self.end_of_stream)
942
0
            .field("stream", &self.stream)
943
0
            .finish()
944
0
    }
945
}