Coverage Report

Created: 2025-12-31 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/send.rs
Line
Count
Source
1
use super::{
2
    store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
3
    StreamIdOverflow, WindowSize,
4
};
5
use crate::codec::UserError;
6
use crate::frame::{self, Reason};
7
use crate::proto::{self, Error, Initiator};
8
9
use bytes::Buf;
10
use tokio::io::AsyncWrite;
11
12
use std::cmp::Ordering;
13
use std::io;
14
use std::task::{Context, Poll, Waker};
15
16
/// Manages state transitions related to outbound frames.
17
#[derive(Debug)]
18
pub(super) struct Send {
19
    /// Stream identifier to use for next initialized stream.
20
    next_stream_id: Result<StreamId, StreamIdOverflow>,
21
22
    /// Any streams with a higher ID are ignored.
23
    ///
24
    /// This starts as MAX, but is lowered when a GOAWAY is received.
25
    ///
26
    /// > After sending a GOAWAY frame, the sender can discard frames for
27
    /// > streams initiated by the receiver with identifiers higher than
28
    /// > the identified last stream.
29
    max_stream_id: StreamId,
30
31
    /// Initial window size of locally initiated streams
32
    init_window_sz: WindowSize,
33
34
    /// Prioritization layer
35
    prioritize: Prioritize,
36
37
    is_push_enabled: bool,
38
39
    /// If extended connect protocol is enabled.
40
    is_extended_connect_protocol_enabled: bool,
41
}
42
43
/// A value to detect which public API has called `poll_reset`.
44
#[derive(Debug)]
45
pub(crate) enum PollReset {
46
    AwaitingHeaders,
47
    Streaming,
48
}
49
50
impl Send {
51
    /// Create a new `Send`
52
13.7k
    pub fn new(config: &Config) -> Self {
53
13.7k
        Send {
54
13.7k
            init_window_sz: config.remote_init_window_sz,
55
13.7k
            max_stream_id: StreamId::MAX,
56
13.7k
            next_stream_id: Ok(config.local_next_stream_id),
57
13.7k
            prioritize: Prioritize::new(config),
58
13.7k
            is_push_enabled: true,
59
13.7k
            is_extended_connect_protocol_enabled: false,
60
13.7k
        }
61
13.7k
    }
62
63
    /// Returns the initial send window size
64
464k
    pub fn init_window_sz(&self) -> WindowSize {
65
464k
        self.init_window_sz
66
464k
    }
67
68
463k
    pub fn open(&mut self) -> Result<StreamId, UserError> {
69
463k
        let stream_id = self.ensure_next_stream_id()?;
70
463k
        self.next_stream_id = stream_id.next_id();
71
463k
        Ok(stream_id)
72
463k
    }
73
74
0
    pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
75
0
        let stream_id = self.ensure_next_stream_id()?;
76
0
        self.next_stream_id = stream_id.next_id();
77
0
        Ok(stream_id)
78
0
    }
79
80
463k
    fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
81
        // 8.1.2.2. Connection-Specific Header Fields
82
463k
        if fields.contains_key(http::header::CONNECTION)
83
463k
            || fields.contains_key(http::header::TRANSFER_ENCODING)
84
463k
            || fields.contains_key(http::header::UPGRADE)
85
463k
            || fields.contains_key("keep-alive")
86
463k
            || fields.contains_key("proxy-connection")
87
        {
88
3
            tracing::debug!("illegal connection-specific headers found");
89
3
            return Err(UserError::MalformedHeaders);
90
463k
        } else if let Some(te) = fields.get(http::header::TE) {
91
88
            if te != "trailers" {
92
87
                tracing::debug!("illegal connection-specific headers found");
93
87
                return Err(UserError::MalformedHeaders);
94
1
            }
95
463k
        }
96
463k
        Ok(())
97
463k
    }
98
99
    pub fn send_push_promise<B>(
100
        &mut self,
101
        frame: frame::PushPromise,
102
        buffer: &mut Buffer<Frame<B>>,
103
        stream: &mut store::Ptr,
104
        task: &mut Option<Waker>,
105
    ) -> Result<(), UserError> {
106
        if !self.is_push_enabled {
107
            return Err(UserError::PeerDisabledServerPush);
108
        }
109
110
        tracing::trace!(
111
            "send_push_promise; frame={:?}; init_window={:?}",
112
            frame,
113
            self.init_window_sz
114
        );
115
116
        Self::check_headers(frame.fields())?;
117
118
        // Queue the frame for sending
119
        self.prioritize
120
            .queue_frame(frame.into(), buffer, stream, task);
121
122
        Ok(())
123
    }
124
125
463k
    pub fn send_headers<B>(
126
463k
        &mut self,
127
463k
        frame: frame::Headers,
128
463k
        buffer: &mut Buffer<Frame<B>>,
129
463k
        stream: &mut store::Ptr,
130
463k
        counts: &mut Counts,
131
463k
        task: &mut Option<Waker>,
132
463k
    ) -> Result<(), UserError> {
133
463k
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
463k
        Self::check_headers(frame.fields())?;
140
141
463k
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
463k
        stream.state.send_open(end_stream)?;
145
146
463k
        let mut pending_open = false;
147
463k
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
463k
            self.prioritize.queue_open(stream);
149
463k
            pending_open = true;
150
463k
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
463k
        self.prioritize
157
463k
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
463k
        if pending_open {
162
463k
            if let Some(task) = task.take() {
163
3.25k
                task.wake();
164
460k
            }
165
0
        }
166
167
463k
        Ok(())
168
463k
    }
<h2::proto::streams::send::Send>::send_headers::<bytes::bytes::Bytes>
Line
Count
Source
125
733
    pub fn send_headers<B>(
126
733
        &mut self,
127
733
        frame: frame::Headers,
128
733
        buffer: &mut Buffer<Frame<B>>,
129
733
        stream: &mut store::Ptr,
130
733
        counts: &mut Counts,
131
733
        task: &mut Option<Waker>,
132
733
    ) -> Result<(), UserError> {
133
733
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
733
        Self::check_headers(frame.fields())?;
140
141
643
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
643
        stream.state.send_open(end_stream)?;
145
146
643
        let mut pending_open = false;
147
643
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
643
            self.prioritize.queue_open(stream);
149
643
            pending_open = true;
150
643
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
643
        self.prioritize
157
643
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
643
        if pending_open {
162
643
            if let Some(task) = task.take() {
163
0
                task.wake();
164
643
            }
165
0
        }
166
167
643
        Ok(())
168
733
    }
<h2::proto::streams::send::Send>::send_headers::<bytes::bytes::Bytes>
Line
Count
Source
125
462k
    pub fn send_headers<B>(
126
462k
        &mut self,
127
462k
        frame: frame::Headers,
128
462k
        buffer: &mut Buffer<Frame<B>>,
129
462k
        stream: &mut store::Ptr,
130
462k
        counts: &mut Counts,
131
462k
        task: &mut Option<Waker>,
132
462k
    ) -> Result<(), UserError> {
133
462k
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
462k
        Self::check_headers(frame.fields())?;
140
141
462k
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
462k
        stream.state.send_open(end_stream)?;
145
146
462k
        let mut pending_open = false;
147
462k
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
462k
            self.prioritize.queue_open(stream);
149
462k
            pending_open = true;
150
462k
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
462k
        self.prioritize
157
462k
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
462k
        if pending_open {
162
462k
            if let Some(task) = task.take() {
163
3.25k
                task.wake();
164
459k
            }
165
0
        }
166
167
462k
        Ok(())
168
462k
    }
169
170
    /// Send interim informational headers (1xx responses) without changing stream state.
171
    /// This allows multiple interim informational responses to be sent before the final response.
172
    pub fn send_interim_informational_headers<B>(
173
        &mut self,
174
        frame: frame::Headers,
175
        buffer: &mut Buffer<Frame<B>>,
176
        stream: &mut store::Ptr,
177
        _counts: &mut Counts,
178
        task: &mut Option<Waker>,
179
    ) -> Result<(), UserError> {
180
        tracing::trace!(
181
            "send_interim_informational_headers; frame={:?}; stream_id={:?}",
182
            frame,
183
            frame.stream_id()
184
        );
185
186
        // Validate headers
187
        Self::check_headers(frame.fields())?;
188
189
        debug_assert!(frame.is_informational(),
190
            "Frame must be informational (1xx status code) at this point. Validation should happen at the public API boundary.");
191
        debug_assert!(!frame.is_end_stream(),
192
            "Informational frames must not have end_stream flag set. Validation should happen at the internal send informational header streams.");
193
194
        // Queue the frame for sending WITHOUT changing stream state
195
        // This is the key difference from send_headers - we don't call stream.state.send_open()
196
        self.prioritize
197
            .queue_frame(frame.into(), buffer, stream, task);
198
199
        Ok(())
200
    }
201
202
    /// Send an explicit RST_STREAM frame
203
85.8k
    pub fn send_reset<B>(
204
85.8k
        &mut self,
205
85.8k
        reason: Reason,
206
85.8k
        initiator: Initiator,
207
85.8k
        buffer: &mut Buffer<Frame<B>>,
208
85.8k
        stream: &mut store::Ptr,
209
85.8k
        counts: &mut Counts,
210
85.8k
        task: &mut Option<Waker>,
211
85.8k
    ) {
212
85.8k
        let is_reset = stream.state.is_reset();
213
85.8k
        let is_closed = stream.state.is_closed();
214
85.8k
        let is_empty = stream.pending_send.is_empty();
215
85.8k
        let stream_id = stream.id;
216
217
85.8k
        tracing::trace!(
218
0
            "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
219
0
             is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
220
0
             state={:?} \
221
0
             ",
222
            reason,
223
            initiator,
224
            stream_id,
225
            is_reset,
226
            is_closed,
227
            is_empty,
228
0
            stream.state
229
        );
230
231
85.8k
        if is_reset {
232
            // Don't double reset
233
23.8k
            tracing::trace!(
234
0
                " -> not sending RST_STREAM ({:?} is already reset)",
235
                stream_id
236
            );
237
23.8k
            return;
238
61.9k
        }
239
240
        // Transition the state to reset no matter what.
241
61.9k
        stream.set_reset(reason, initiator);
242
243
        // If closed AND the send queue is flushed, then the stream cannot be
244
        // reset explicitly, either. Implicit resets can still be queued.
245
61.9k
        if is_closed && is_empty {
246
2
            tracing::trace!(
247
0
                " -> not sending explicit RST_STREAM ({:?} was closed \
248
0
                 and send queue was flushed)",
249
                stream_id
250
            );
251
2
            return;
252
61.9k
        }
253
254
        // Clear all pending outbound frames.
255
        // Note that we don't call `self.recv_err` because we want to enqueue
256
        // the reset frame before transitioning the stream inside
257
        // `reclaim_all_capacity`.
258
61.9k
        self.prioritize.clear_queue(buffer, stream);
259
260
61.9k
        let frame = frame::Reset::new(stream.id, reason);
261
262
61.9k
        tracing::trace!("send_reset -- queueing; frame={:?}", frame);
263
61.9k
        self.prioritize
264
61.9k
            .queue_frame(frame.into(), buffer, stream, task);
265
61.9k
        self.prioritize.reclaim_all_capacity(stream, counts);
266
85.8k
    }
267
268
77.7k
    pub fn schedule_implicit_reset(
269
77.7k
        &mut self,
270
77.7k
        stream: &mut store::Ptr,
271
77.7k
        reason: Reason,
272
77.7k
        counts: &mut Counts,
273
77.7k
        task: &mut Option<Waker>,
274
77.7k
    ) {
275
77.7k
        if stream.state.is_closed() {
276
            // Stream is already closed, nothing more to do
277
0
            return;
278
77.7k
        }
279
280
77.7k
        stream.state.set_scheduled_reset(reason);
281
282
77.7k
        self.prioritize.reclaim_reserved_capacity(stream, counts);
283
77.7k
        self.prioritize.schedule_send(stream, task);
284
77.7k
    }
285
286
462k
    pub fn send_data<B>(
287
462k
        &mut self,
288
462k
        frame: frame::Data<B>,
289
462k
        buffer: &mut Buffer<Frame<B>>,
290
462k
        stream: &mut store::Ptr,
291
462k
        counts: &mut Counts,
292
462k
        task: &mut Option<Waker>,
293
462k
    ) -> Result<(), UserError>
294
462k
    where
295
462k
        B: Buf,
296
    {
297
462k
        self.prioritize
298
462k
            .send_data(frame, buffer, stream, counts, task)
299
462k
    }
300
301
    pub fn send_trailers<B>(
302
        &mut self,
303
        frame: frame::Headers,
304
        buffer: &mut Buffer<Frame<B>>,
305
        stream: &mut store::Ptr,
306
        counts: &mut Counts,
307
        task: &mut Option<Waker>,
308
    ) -> Result<(), UserError> {
309
        // TODO: Should this logic be moved into state.rs?
310
        if !stream.state.is_send_streaming() {
311
            return Err(UserError::UnexpectedFrameType);
312
        }
313
314
        stream.state.send_close();
315
316
        tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
317
        self.prioritize
318
            .queue_frame(frame.into(), buffer, stream, task);
319
320
        // Release any excess capacity
321
        self.prioritize.reserve_capacity(0, stream, counts);
322
323
        Ok(())
324
    }
325
326
1.51M
    pub fn poll_complete<T, B>(
327
1.51M
        &mut self,
328
1.51M
        cx: &mut Context,
329
1.51M
        buffer: &mut Buffer<Frame<B>>,
330
1.51M
        store: &mut Store,
331
1.51M
        counts: &mut Counts,
332
1.51M
        dst: &mut Codec<T, Prioritized<B>>,
333
1.51M
    ) -> Poll<io::Result<()>>
334
1.51M
    where
335
1.51M
        T: AsyncWrite + Unpin,
336
1.51M
        B: Buf,
337
    {
338
1.51M
        self.prioritize
339
1.51M
            .poll_complete(cx, buffer, store, counts, dst)
340
1.51M
    }
341
342
    /// Request capacity to send data
343
0
    pub fn reserve_capacity(
344
0
        &mut self,
345
0
        capacity: WindowSize,
346
0
        stream: &mut store::Ptr,
347
0
        counts: &mut Counts,
348
0
    ) {
349
0
        self.prioritize.reserve_capacity(capacity, stream, counts)
350
0
    }
351
352
0
    pub fn poll_capacity(
353
0
        &mut self,
354
0
        cx: &Context,
355
0
        stream: &mut store::Ptr,
356
0
    ) -> Poll<Option<Result<WindowSize, UserError>>> {
357
0
        if !stream.state.is_send_streaming() {
358
0
            return Poll::Ready(None);
359
0
        }
360
361
0
        if !stream.send_capacity_inc {
362
0
            stream.wait_send(cx);
363
0
            return Poll::Pending;
364
0
        }
365
366
0
        stream.send_capacity_inc = false;
367
368
0
        Poll::Ready(Some(Ok(self.capacity(stream))))
369
0
    }
370
371
    /// Current available stream send capacity
372
0
    pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
373
0
        stream.capacity(self.prioritize.max_buffer_size())
374
0
    }
375
376
0
    pub fn poll_reset(
377
0
        &self,
378
0
        cx: &Context,
379
0
        stream: &mut Stream,
380
0
        mode: PollReset,
381
0
    ) -> Poll<Result<Reason, crate::Error>> {
382
0
        match stream.state.ensure_reason(mode)? {
383
0
            Some(reason) => Poll::Ready(Ok(reason)),
384
            None => {
385
0
                stream.wait_send(cx);
386
0
                Poll::Pending
387
            }
388
        }
389
0
    }
390
391
872
    pub fn recv_connection_window_update(
392
872
        &mut self,
393
872
        frame: frame::WindowUpdate,
394
872
        store: &mut Store,
395
872
        counts: &mut Counts,
396
872
    ) -> Result<(), Reason> {
397
872
        self.prioritize
398
872
            .recv_connection_window_update(frame.size_increment(), store, counts)
399
872
    }
400
401
115k
    pub fn recv_stream_window_update<B>(
402
115k
        &mut self,
403
115k
        sz: WindowSize,
404
115k
        buffer: &mut Buffer<Frame<B>>,
405
115k
        stream: &mut store::Ptr,
406
115k
        counts: &mut Counts,
407
115k
        task: &mut Option<Waker>,
408
115k
    ) -> Result<(), Reason> {
409
115k
        if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
410
34
            tracing::debug!("recv_stream_window_update !!; err={:?}", e);
411
412
34
            self.send_reset(
413
                Reason::FLOW_CONTROL_ERROR,
414
34
                Initiator::Library,
415
34
                buffer,
416
34
                stream,
417
34
                counts,
418
34
                task,
419
            );
420
421
34
            return Err(e);
422
115k
        }
423
424
115k
        Ok(())
425
115k
    }
426
427
6.01k
    pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
428
6.01k
        if last_stream_id > self.max_stream_id {
429
            // The remote endpoint sent a `GOAWAY` frame indicating a stream
430
            // that we never sent, or that we have already terminated on account
431
            // of previous `GOAWAY` frame. In either case, that is illegal.
432
            // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
433
            // the value they send in the last stream identifier, since the
434
            // peers might already have retried unprocessed requests on another
435
            // connection.")
436
33
            proto_err!(conn:
437
                "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
438
                last_stream_id, self.max_stream_id,
439
            );
440
33
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
441
5.98k
        }
442
443
5.98k
        self.max_stream_id = last_stream_id;
444
5.98k
        Ok(())
445
6.01k
    }
446
447
756k
    pub fn handle_error<B>(
448
756k
        &mut self,
449
756k
        buffer: &mut Buffer<Frame<B>>,
450
756k
        stream: &mut store::Ptr,
451
756k
        counts: &mut Counts,
452
756k
    ) {
453
        // Clear all pending outbound frames
454
756k
        self.prioritize.clear_queue(buffer, stream);
455
756k
        self.prioritize.reclaim_all_capacity(stream, counts);
456
756k
    }
<h2::proto::streams::send::Send>::handle_error::<bytes::bytes::Bytes>
Line
Count
Source
447
643
    pub fn handle_error<B>(
448
643
        &mut self,
449
643
        buffer: &mut Buffer<Frame<B>>,
450
643
        stream: &mut store::Ptr,
451
643
        counts: &mut Counts,
452
643
    ) {
453
        // Clear all pending outbound frames
454
643
        self.prioritize.clear_queue(buffer, stream);
455
643
        self.prioritize.reclaim_all_capacity(stream, counts);
456
643
    }
<h2::proto::streams::send::Send>::handle_error::<bytes::bytes::Bytes>
Line
Count
Source
447
755k
    pub fn handle_error<B>(
448
755k
        &mut self,
449
755k
        buffer: &mut Buffer<Frame<B>>,
450
755k
        stream: &mut store::Ptr,
451
755k
        counts: &mut Counts,
452
755k
    ) {
453
        // Clear all pending outbound frames
454
755k
        self.prioritize.clear_queue(buffer, stream);
455
755k
        self.prioritize.reclaim_all_capacity(stream, counts);
456
755k
    }
457
458
7.03k
    pub fn apply_remote_settings<B>(
459
7.03k
        &mut self,
460
7.03k
        settings: &frame::Settings,
461
7.03k
        buffer: &mut Buffer<Frame<B>>,
462
7.03k
        store: &mut Store,
463
7.03k
        counts: &mut Counts,
464
7.03k
        task: &mut Option<Waker>,
465
7.03k
    ) -> Result<(), Error> {
466
7.03k
        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
467
206
            self.is_extended_connect_protocol_enabled = val;
468
6.82k
        }
469
470
        // Applies an update to the remote endpoint's initial window size.
471
        //
472
        // Per RFC 7540 ยง6.9.2:
473
        //
474
        // In addition to changing the flow-control window for streams that are
475
        // not yet active, a SETTINGS frame can alter the initial flow-control
476
        // window size for streams with active flow-control windows (that is,
477
        // streams in the "open" or "half-closed (remote)" state). When the
478
        // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
479
        // the size of all stream flow-control windows that it maintains by the
480
        // difference between the new value and the old value.
481
        //
482
        // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
483
        // space in a flow-control window to become negative. A sender MUST
484
        // track the negative flow-control window and MUST NOT send new
485
        // flow-controlled frames until it receives WINDOW_UPDATE frames that
486
        // cause the flow-control window to become positive.
487
7.03k
        if let Some(val) = settings.initial_window_size() {
488
3.45k
            let old_val = self.init_window_sz;
489
3.45k
            self.init_window_sz = val;
490
491
3.45k
            match val.cmp(&old_val) {
492
                Ordering::Less => {
493
                    // We must decrease the (remote) window on every open stream.
494
1.62k
                    let dec = old_val - val;
495
1.62k
                    tracing::trace!("decrementing all windows; dec={}", dec);
496
497
1.62k
                    let mut total_reclaimed = 0;
498
141k
                    store.try_for_each(|mut stream| {
499
141k
                        let stream = &mut *stream;
500
501
141k
                        if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
502
5.28k
                            tracing::trace!(
503
0
                                "skipping send-closed stream; id={:?}; flow={:?}",
504
                                stream.id,
505
                                stream.send_flow
506
                            );
507
508
5.28k
                            return Ok(());
509
135k
                        }
510
511
135k
                        tracing::trace!(
512
0
                            "decrementing stream window; id={:?}; decr={}; flow={:?}",
513
                            stream.id,
514
                            dec,
515
                            stream.send_flow
516
                        );
517
518
                        // TODO: this decrement can underflow based on received frames!
519
135k
                        stream
520
135k
                            .send_flow
521
135k
                            .dec_send_window(dec)
522
135k
                            .map_err(proto::Error::library_go_away)?;
523
524
                        // It's possible that decreasing the window causes
525
                        // `window_size` (the stream-specific window) to fall below
526
                        // `available` (the portion of the connection-level window
527
                        // that we have allocated to the stream).
528
                        // In this case, we should take that excess allocation away
529
                        // and reassign it to other streams.
530
135k
                        let window_size = stream.send_flow.window_size();
531
135k
                        let available = stream.send_flow.available().as_size();
532
135k
                        let reclaimed = if available > window_size {
533
                            // Drop down to `window_size`.
534
17.4k
                            let reclaim = available - window_size;
535
17.4k
                            stream
536
17.4k
                                .send_flow
537
17.4k
                                .claim_capacity(reclaim)
538
17.4k
                                .map_err(proto::Error::library_go_away)?;
539
17.4k
                            total_reclaimed += reclaim;
540
17.4k
                            reclaim
541
                        } else {
542
118k
                            0
543
                        };
544
545
135k
                        tracing::trace!(
546
0
                            "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
547
                            stream.id,
548
                            dec,
549
                            reclaimed,
550
                            stream.send_flow
551
                        );
552
553
                        // TODO: Should this notify the producer when the capacity
554
                        // of a stream is reduced? Maybe it should if the capacity
555
                        // is reduced to zero, allowing the producer to stop work.
556
557
135k
                        Ok::<_, proto::Error>(())
558
141k
                    })?;
559
560
1.62k
                    self.prioritize
561
1.62k
                        .assign_connection_capacity(total_reclaimed, store, counts);
562
                }
563
                Ordering::Greater => {
564
1.22k
                    let inc = val - old_val;
565
566
115k
                    store.try_for_each(|mut stream| {
567
115k
                        self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
568
115k
                            .map_err(Error::library_go_away)
569
115k
                    })?;
570
                }
571
602
                Ordering::Equal => (),
572
            }
573
3.58k
        }
574
575
7.02k
        if let Some(val) = settings.is_push_enabled() {
576
315
            self.is_push_enabled = val
577
6.70k
        }
578
579
7.02k
        Ok(())
580
7.03k
    }
581
582
18.5k
    pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
583
18.5k
        self.prioritize.clear_pending_capacity(store, counts);
584
18.5k
        self.prioritize.clear_pending_send(store, counts);
585
18.5k
        self.prioritize.clear_pending_open(store, counts);
586
18.5k
    }
587
588
806
    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
589
806
        if let Ok(next) = self.next_stream_id {
590
479
            if id >= next {
591
37
                return Err(Reason::PROTOCOL_ERROR);
592
442
            }
593
327
        }
594
        // if next_stream_id is overflowed, that's ok.
595
596
769
        Ok(())
597
806
    }
598
599
1.39M
    pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
600
1.39M
        self.next_stream_id
601
1.39M
            .map_err(|_| UserError::OverflowedStreamId)
602
1.39M
    }
603
604
6.63k
    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
605
6.63k
        if let Ok(next_id) = self.next_stream_id {
606
            // Peer::is_local_init should have been called beforehand
607
6.17k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
608
6.17k
            id < next_id
609
        } else {
610
456
            true
611
        }
612
6.63k
    }
613
614
11.4k
    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
615
11.4k
        if let Ok(next_id) = self.next_stream_id {
616
            // Peer::is_local_init should have been called beforehand
617
10.9k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
618
10.9k
            if id >= next_id {
619
3.14k
                self.next_stream_id = id.next_id();
620
7.78k
            }
621
542
        }
622
11.4k
    }
623
624
0
    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
625
0
        self.is_extended_connect_protocol_enabled
626
0
    }
627
}