Coverage Report

Created: 2025-10-13 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/send.rs
Line
Count
Source
1
use super::{
2
    store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
3
    StreamIdOverflow, WindowSize,
4
};
5
use crate::codec::UserError;
6
use crate::frame::{self, Reason};
7
use crate::proto::{self, Error, Initiator};
8
9
use bytes::Buf;
10
use tokio::io::AsyncWrite;
11
12
use std::cmp::Ordering;
13
use std::io;
14
use std::task::{Context, Poll, Waker};
15
16
/// Manages state transitions related to outbound frames.
17
#[derive(Debug)]
18
pub(super) struct Send {
19
    /// Stream identifier to use for next initialized stream.
20
    next_stream_id: Result<StreamId, StreamIdOverflow>,
21
22
    /// Any streams with a higher ID are ignored.
23
    ///
24
    /// This starts as MAX, but is lowered when a GOAWAY is received.
25
    ///
26
    /// > After sending a GOAWAY frame, the sender can discard frames for
27
    /// > streams initiated by the receiver with identifiers higher than
28
    /// > the identified last stream.
29
    max_stream_id: StreamId,
30
31
    /// Initial window size of locally initiated streams
32
    init_window_sz: WindowSize,
33
34
    /// Prioritization layer
35
    prioritize: Prioritize,
36
37
    is_push_enabled: bool,
38
39
    /// If extended connect protocol is enabled.
40
    is_extended_connect_protocol_enabled: bool,
41
}
42
43
/// A value to detect which public API has called `poll_reset`.
44
#[derive(Debug)]
45
pub(crate) enum PollReset {
46
    AwaitingHeaders,
47
    Streaming,
48
}
49
50
impl Send {
51
    /// Create a new `Send`
52
13.2k
    pub fn new(config: &Config) -> Self {
53
13.2k
        Send {
54
13.2k
            init_window_sz: config.remote_init_window_sz,
55
13.2k
            max_stream_id: StreamId::MAX,
56
13.2k
            next_stream_id: Ok(config.local_next_stream_id),
57
13.2k
            prioritize: Prioritize::new(config),
58
13.2k
            is_push_enabled: true,
59
13.2k
            is_extended_connect_protocol_enabled: false,
60
13.2k
        }
61
13.2k
    }
62
63
    /// Returns the initial send window size
64
428k
    pub fn init_window_sz(&self) -> WindowSize {
65
428k
        self.init_window_sz
66
428k
    }
67
68
427k
    pub fn open(&mut self) -> Result<StreamId, UserError> {
69
427k
        let stream_id = self.ensure_next_stream_id()?;
70
427k
        self.next_stream_id = stream_id.next_id();
71
427k
        Ok(stream_id)
72
427k
    }
73
74
0
    pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
75
0
        let stream_id = self.ensure_next_stream_id()?;
76
0
        self.next_stream_id = stream_id.next_id();
77
0
        Ok(stream_id)
78
0
    }
79
80
427k
    fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
81
        // 8.1.2.2. Connection-Specific Header Fields
82
427k
        if fields.contains_key(http::header::CONNECTION)
83
427k
            || fields.contains_key(http::header::TRANSFER_ENCODING)
84
427k
            || fields.contains_key(http::header::UPGRADE)
85
427k
            || fields.contains_key("keep-alive")
86
427k
            || fields.contains_key("proxy-connection")
87
        {
88
3
            tracing::debug!("illegal connection-specific headers found");
89
3
            return Err(UserError::MalformedHeaders);
90
427k
        } else if let Some(te) = fields.get(http::header::TE) {
91
99
            if te != "trailers" {
92
98
                tracing::debug!("illegal connection-specific headers found");
93
98
                return Err(UserError::MalformedHeaders);
94
1
            }
95
427k
        }
96
427k
        Ok(())
97
427k
    }
98
99
    pub fn send_push_promise<B>(
100
        &mut self,
101
        frame: frame::PushPromise,
102
        buffer: &mut Buffer<Frame<B>>,
103
        stream: &mut store::Ptr,
104
        task: &mut Option<Waker>,
105
    ) -> Result<(), UserError> {
106
        if !self.is_push_enabled {
107
            return Err(UserError::PeerDisabledServerPush);
108
        }
109
110
        tracing::trace!(
111
            "send_push_promise; frame={:?}; init_window={:?}",
112
            frame,
113
            self.init_window_sz
114
        );
115
116
        Self::check_headers(frame.fields())?;
117
118
        // Queue the frame for sending
119
        self.prioritize
120
            .queue_frame(frame.into(), buffer, stream, task);
121
122
        Ok(())
123
    }
124
125
427k
    pub fn send_headers<B>(
126
427k
        &mut self,
127
427k
        frame: frame::Headers,
128
427k
        buffer: &mut Buffer<Frame<B>>,
129
427k
        stream: &mut store::Ptr,
130
427k
        counts: &mut Counts,
131
427k
        task: &mut Option<Waker>,
132
427k
    ) -> Result<(), UserError> {
133
427k
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
427k
        Self::check_headers(frame.fields())?;
140
141
427k
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
427k
        stream.state.send_open(end_stream)?;
145
146
427k
        let mut pending_open = false;
147
427k
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
427k
            self.prioritize.queue_open(stream);
149
427k
            pending_open = true;
150
427k
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
427k
        self.prioritize
157
427k
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
427k
        if pending_open {
162
427k
            if let Some(task) = task.take() {
163
2.98k
                task.wake();
164
424k
            }
165
0
        }
166
167
427k
        Ok(())
168
427k
    }
<h2::proto::streams::send::Send>::send_headers::<bytes::bytes::Bytes>
Line
Count
Source
125
669
    pub fn send_headers<B>(
126
669
        &mut self,
127
669
        frame: frame::Headers,
128
669
        buffer: &mut Buffer<Frame<B>>,
129
669
        stream: &mut store::Ptr,
130
669
        counts: &mut Counts,
131
669
        task: &mut Option<Waker>,
132
669
    ) -> Result<(), UserError> {
133
669
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
669
        Self::check_headers(frame.fields())?;
140
141
568
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
568
        stream.state.send_open(end_stream)?;
145
146
568
        let mut pending_open = false;
147
568
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
568
            self.prioritize.queue_open(stream);
149
568
            pending_open = true;
150
568
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
568
        self.prioritize
157
568
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
568
        if pending_open {
162
568
            if let Some(task) = task.take() {
163
0
                task.wake();
164
568
            }
165
0
        }
166
167
568
        Ok(())
168
669
    }
<h2::proto::streams::send::Send>::send_headers::<bytes::bytes::Bytes>
Line
Count
Source
125
426k
    pub fn send_headers<B>(
126
426k
        &mut self,
127
426k
        frame: frame::Headers,
128
426k
        buffer: &mut Buffer<Frame<B>>,
129
426k
        stream: &mut store::Ptr,
130
426k
        counts: &mut Counts,
131
426k
        task: &mut Option<Waker>,
132
426k
    ) -> Result<(), UserError> {
133
426k
        tracing::trace!(
134
0
            "send_headers; frame={:?}; init_window={:?}",
135
            frame,
136
            self.init_window_sz
137
        );
138
139
426k
        Self::check_headers(frame.fields())?;
140
141
426k
        let end_stream = frame.is_end_stream();
142
143
        // Update the state
144
426k
        stream.state.send_open(end_stream)?;
145
146
426k
        let mut pending_open = false;
147
426k
        if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148
426k
            self.prioritize.queue_open(stream);
149
426k
            pending_open = true;
150
426k
        }
151
152
        // Queue the frame for sending
153
        //
154
        // This call expects that, since new streams are in the open queue, new
155
        // streams won't be pushed on pending_send.
156
426k
        self.prioritize
157
426k
            .queue_frame(frame.into(), buffer, stream, task);
158
159
        // Need to notify the connection when pushing onto pending_open since
160
        // queue_frame only notifies for pending_send.
161
426k
        if pending_open {
162
426k
            if let Some(task) = task.take() {
163
2.98k
                task.wake();
164
423k
            }
165
0
        }
166
167
426k
        Ok(())
168
426k
    }
169
170
    /// Send an explicit RST_STREAM frame
171
58.5k
    pub fn send_reset<B>(
172
58.5k
        &mut self,
173
58.5k
        reason: Reason,
174
58.5k
        initiator: Initiator,
175
58.5k
        buffer: &mut Buffer<Frame<B>>,
176
58.5k
        stream: &mut store::Ptr,
177
58.5k
        counts: &mut Counts,
178
58.5k
        task: &mut Option<Waker>,
179
58.5k
    ) {
180
58.5k
        let is_reset = stream.state.is_reset();
181
58.5k
        let is_closed = stream.state.is_closed();
182
58.5k
        let is_empty = stream.pending_send.is_empty();
183
58.5k
        let stream_id = stream.id;
184
185
58.5k
        tracing::trace!(
186
0
            "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
187
0
             is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
188
0
             state={:?} \
189
0
             ",
190
            reason,
191
            initiator,
192
            stream_id,
193
            is_reset,
194
            is_closed,
195
            is_empty,
196
0
            stream.state
197
        );
198
199
58.5k
        if is_reset {
200
            // Don't double reset
201
22.4k
            tracing::trace!(
202
0
                " -> not sending RST_STREAM ({:?} is already reset)",
203
                stream_id
204
            );
205
22.4k
            return;
206
36.1k
        }
207
208
        // Transition the state to reset no matter what.
209
36.1k
        stream.set_reset(reason, initiator);
210
211
        // If closed AND the send queue is flushed, then the stream cannot be
212
        // reset explicitly, either. Implicit resets can still be queued.
213
36.1k
        if is_closed && is_empty {
214
2
            tracing::trace!(
215
0
                " -> not sending explicit RST_STREAM ({:?} was closed \
216
0
                 and send queue was flushed)",
217
                stream_id
218
            );
219
2
            return;
220
36.1k
        }
221
222
        // Clear all pending outbound frames.
223
        // Note that we don't call `self.recv_err` because we want to enqueue
224
        // the reset frame before transitioning the stream inside
225
        // `reclaim_all_capacity`.
226
36.1k
        self.prioritize.clear_queue(buffer, stream);
227
228
36.1k
        let frame = frame::Reset::new(stream.id, reason);
229
230
36.1k
        tracing::trace!("send_reset -- queueing; frame={:?}", frame);
231
36.1k
        self.prioritize
232
36.1k
            .queue_frame(frame.into(), buffer, stream, task);
233
36.1k
        self.prioritize.reclaim_all_capacity(stream, counts);
234
58.5k
    }
235
236
80.2k
    pub fn schedule_implicit_reset(
237
80.2k
        &mut self,
238
80.2k
        stream: &mut store::Ptr,
239
80.2k
        reason: Reason,
240
80.2k
        counts: &mut Counts,
241
80.2k
        task: &mut Option<Waker>,
242
80.2k
    ) {
243
80.2k
        if stream.state.is_closed() {
244
            // Stream is already closed, nothing more to do
245
0
            return;
246
80.2k
        }
247
248
80.2k
        stream.state.set_scheduled_reset(reason);
249
250
80.2k
        self.prioritize.reclaim_reserved_capacity(stream, counts);
251
80.2k
        self.prioritize.schedule_send(stream, task);
252
80.2k
    }
253
254
426k
    pub fn send_data<B>(
255
426k
        &mut self,
256
426k
        frame: frame::Data<B>,
257
426k
        buffer: &mut Buffer<Frame<B>>,
258
426k
        stream: &mut store::Ptr,
259
426k
        counts: &mut Counts,
260
426k
        task: &mut Option<Waker>,
261
426k
    ) -> Result<(), UserError>
262
426k
    where
263
426k
        B: Buf,
264
    {
265
426k
        self.prioritize
266
426k
            .send_data(frame, buffer, stream, counts, task)
267
426k
    }
268
269
    pub fn send_trailers<B>(
270
        &mut self,
271
        frame: frame::Headers,
272
        buffer: &mut Buffer<Frame<B>>,
273
        stream: &mut store::Ptr,
274
        counts: &mut Counts,
275
        task: &mut Option<Waker>,
276
    ) -> Result<(), UserError> {
277
        // TODO: Should this logic be moved into state.rs?
278
        if !stream.state.is_send_streaming() {
279
            return Err(UserError::UnexpectedFrameType);
280
        }
281
282
        stream.state.send_close();
283
284
        tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
285
        self.prioritize
286
            .queue_frame(frame.into(), buffer, stream, task);
287
288
        // Release any excess capacity
289
        self.prioritize.reserve_capacity(0, stream, counts);
290
291
        Ok(())
292
    }
293
294
14.3k
    pub fn poll_complete<T, B>(
295
14.3k
        &mut self,
296
14.3k
        cx: &mut Context,
297
14.3k
        buffer: &mut Buffer<Frame<B>>,
298
14.3k
        store: &mut Store,
299
14.3k
        counts: &mut Counts,
300
14.3k
        dst: &mut Codec<T, Prioritized<B>>,
301
14.3k
    ) -> Poll<io::Result<()>>
302
14.3k
    where
303
14.3k
        T: AsyncWrite + Unpin,
304
14.3k
        B: Buf,
305
    {
306
14.3k
        self.prioritize
307
14.3k
            .poll_complete(cx, buffer, store, counts, dst)
308
14.3k
    }
309
310
    /// Request capacity to send data
311
0
    pub fn reserve_capacity(
312
0
        &mut self,
313
0
        capacity: WindowSize,
314
0
        stream: &mut store::Ptr,
315
0
        counts: &mut Counts,
316
0
    ) {
317
0
        self.prioritize.reserve_capacity(capacity, stream, counts)
318
0
    }
319
320
0
    pub fn poll_capacity(
321
0
        &mut self,
322
0
        cx: &Context,
323
0
        stream: &mut store::Ptr,
324
0
    ) -> Poll<Option<Result<WindowSize, UserError>>> {
325
0
        if !stream.state.is_send_streaming() {
326
0
            return Poll::Ready(None);
327
0
        }
328
329
0
        if !stream.send_capacity_inc {
330
0
            stream.wait_send(cx);
331
0
            return Poll::Pending;
332
0
        }
333
334
0
        stream.send_capacity_inc = false;
335
336
0
        Poll::Ready(Some(Ok(self.capacity(stream))))
337
0
    }
338
339
    /// Current available stream send capacity
340
0
    pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
341
0
        stream.capacity(self.prioritize.max_buffer_size())
342
0
    }
343
344
0
    pub fn poll_reset(
345
0
        &self,
346
0
        cx: &Context,
347
0
        stream: &mut Stream,
348
0
        mode: PollReset,
349
0
    ) -> Poll<Result<Reason, crate::Error>> {
350
0
        match stream.state.ensure_reason(mode)? {
351
0
            Some(reason) => Poll::Ready(Ok(reason)),
352
            None => {
353
0
                stream.wait_send(cx);
354
0
                Poll::Pending
355
            }
356
        }
357
0
    }
358
359
648
    pub fn recv_connection_window_update(
360
648
        &mut self,
361
648
        frame: frame::WindowUpdate,
362
648
        store: &mut Store,
363
648
        counts: &mut Counts,
364
648
    ) -> Result<(), Reason> {
365
648
        self.prioritize
366
648
            .recv_connection_window_update(frame.size_increment(), store, counts)
367
648
    }
368
369
144k
    pub fn recv_stream_window_update<B>(
370
144k
        &mut self,
371
144k
        sz: WindowSize,
372
144k
        buffer: &mut Buffer<Frame<B>>,
373
144k
        stream: &mut store::Ptr,
374
144k
        counts: &mut Counts,
375
144k
        task: &mut Option<Waker>,
376
144k
    ) -> Result<(), Reason> {
377
144k
        if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
378
34
            tracing::debug!("recv_stream_window_update !!; err={:?}", e);
379
380
34
            self.send_reset(
381
                Reason::FLOW_CONTROL_ERROR,
382
34
                Initiator::Library,
383
34
                buffer,
384
34
                stream,
385
34
                counts,
386
34
                task,
387
            );
388
389
34
            return Err(e);
390
144k
        }
391
392
144k
        Ok(())
393
144k
    }
394
395
3.70k
    pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
396
3.70k
        if last_stream_id > self.max_stream_id {
397
            // The remote endpoint sent a `GOAWAY` frame indicating a stream
398
            // that we never sent, or that we have already terminated on account
399
            // of previous `GOAWAY` frame. In either case, that is illegal.
400
            // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
401
            // the value they send in the last stream identifier, since the
402
            // peers might already have retried unprocessed requests on another
403
            // connection.")
404
25
            proto_err!(conn:
405
                "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
406
                last_stream_id, self.max_stream_id,
407
            );
408
25
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
409
3.68k
        }
410
411
3.68k
        self.max_stream_id = last_stream_id;
412
3.68k
        Ok(())
413
3.70k
    }
414
415
589k
    pub fn handle_error<B>(
416
589k
        &mut self,
417
589k
        buffer: &mut Buffer<Frame<B>>,
418
589k
        stream: &mut store::Ptr,
419
589k
        counts: &mut Counts,
420
589k
    ) {
421
        // Clear all pending outbound frames
422
589k
        self.prioritize.clear_queue(buffer, stream);
423
589k
        self.prioritize.reclaim_all_capacity(stream, counts);
424
589k
    }
<h2::proto::streams::send::Send>::handle_error::<bytes::bytes::Bytes>
Line
Count
Source
415
568
    pub fn handle_error<B>(
416
568
        &mut self,
417
568
        buffer: &mut Buffer<Frame<B>>,
418
568
        stream: &mut store::Ptr,
419
568
        counts: &mut Counts,
420
568
    ) {
421
        // Clear all pending outbound frames
422
568
        self.prioritize.clear_queue(buffer, stream);
423
568
        self.prioritize.reclaim_all_capacity(stream, counts);
424
568
    }
<h2::proto::streams::send::Send>::handle_error::<bytes::bytes::Bytes>
Line
Count
Source
415
589k
    pub fn handle_error<B>(
416
589k
        &mut self,
417
589k
        buffer: &mut Buffer<Frame<B>>,
418
589k
        stream: &mut store::Ptr,
419
589k
        counts: &mut Counts,
420
589k
    ) {
421
        // Clear all pending outbound frames
422
589k
        self.prioritize.clear_queue(buffer, stream);
423
589k
        self.prioritize.reclaim_all_capacity(stream, counts);
424
589k
    }
425
426
7.29k
    pub fn apply_remote_settings<B>(
427
7.29k
        &mut self,
428
7.29k
        settings: &frame::Settings,
429
7.29k
        buffer: &mut Buffer<Frame<B>>,
430
7.29k
        store: &mut Store,
431
7.29k
        counts: &mut Counts,
432
7.29k
        task: &mut Option<Waker>,
433
7.29k
    ) -> Result<(), Error> {
434
7.29k
        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
435
201
            self.is_extended_connect_protocol_enabled = val;
436
7.09k
        }
437
438
        // Applies an update to the remote endpoint's initial window size.
439
        //
440
        // Per RFC 7540 ยง6.9.2:
441
        //
442
        // In addition to changing the flow-control window for streams that are
443
        // not yet active, a SETTINGS frame can alter the initial flow-control
444
        // window size for streams with active flow-control windows (that is,
445
        // streams in the "open" or "half-closed (remote)" state). When the
446
        // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
447
        // the size of all stream flow-control windows that it maintains by the
448
        // difference between the new value and the old value.
449
        //
450
        // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
451
        // space in a flow-control window to become negative. A sender MUST
452
        // track the negative flow-control window and MUST NOT send new
453
        // flow-controlled frames until it receives WINDOW_UPDATE frames that
454
        // cause the flow-control window to become positive.
455
7.29k
        if let Some(val) = settings.initial_window_size() {
456
3.86k
            let old_val = self.init_window_sz;
457
3.86k
            self.init_window_sz = val;
458
459
3.86k
            match val.cmp(&old_val) {
460
                Ordering::Less => {
461
                    // We must decrease the (remote) window on every open stream.
462
1.97k
                    let dec = old_val - val;
463
1.97k
                    tracing::trace!("decrementing all windows; dec={}", dec);
464
465
1.97k
                    let mut total_reclaimed = 0;
466
186k
                    store.try_for_each(|mut stream| {
467
186k
                        let stream = &mut *stream;
468
469
186k
                        if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
470
6.42k
                            tracing::trace!(
471
0
                                "skipping send-closed stream; id={:?}; flow={:?}",
472
                                stream.id,
473
                                stream.send_flow
474
                            );
475
476
6.42k
                            return Ok(());
477
179k
                        }
478
479
179k
                        tracing::trace!(
480
0
                            "decrementing stream window; id={:?}; decr={}; flow={:?}",
481
                            stream.id,
482
                            dec,
483
                            stream.send_flow
484
                        );
485
486
                        // TODO: this decrement can underflow based on received frames!
487
179k
                        stream
488
179k
                            .send_flow
489
179k
                            .dec_send_window(dec)
490
179k
                            .map_err(proto::Error::library_go_away)?;
491
492
                        // It's possible that decreasing the window causes
493
                        // `window_size` (the stream-specific window) to fall below
494
                        // `available` (the portion of the connection-level window
495
                        // that we have allocated to the stream).
496
                        // In this case, we should take that excess allocation away
497
                        // and reassign it to other streams.
498
179k
                        let window_size = stream.send_flow.window_size();
499
179k
                        let available = stream.send_flow.available().as_size();
500
179k
                        let reclaimed = if available > window_size {
501
                            // Drop down to `window_size`.
502
18.0k
                            let reclaim = available - window_size;
503
18.0k
                            stream
504
18.0k
                                .send_flow
505
18.0k
                                .claim_capacity(reclaim)
506
18.0k
                                .map_err(proto::Error::library_go_away)?;
507
18.0k
                            total_reclaimed += reclaim;
508
18.0k
                            reclaim
509
                        } else {
510
161k
                            0
511
                        };
512
513
179k
                        tracing::trace!(
514
0
                            "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
515
                            stream.id,
516
                            dec,
517
                            reclaimed,
518
                            stream.send_flow
519
                        );
520
521
                        // TODO: Should this notify the producer when the capacity
522
                        // of a stream is reduced? Maybe it should if the capacity
523
                        // is reduced to zero, allowing the producer to stop work.
524
525
179k
                        Ok::<_, proto::Error>(())
526
186k
                    })?;
527
528
1.97k
                    self.prioritize
529
1.97k
                        .assign_connection_capacity(total_reclaimed, store, counts);
530
                }
531
                Ordering::Greater => {
532
1.43k
                    let inc = val - old_val;
533
534
144k
                    store.try_for_each(|mut stream| {
535
144k
                        self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
536
144k
                            .map_err(Error::library_go_away)
537
144k
                    })?;
538
                }
539
453
                Ordering::Equal => (),
540
            }
541
3.42k
        }
542
543
7.28k
        if let Some(val) = settings.is_push_enabled() {
544
304
            self.is_push_enabled = val
545
6.98k
        }
546
547
7.28k
        Ok(())
548
7.29k
    }
549
550
17.9k
    pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
551
17.9k
        self.prioritize.clear_pending_capacity(store, counts);
552
17.9k
        self.prioritize.clear_pending_send(store, counts);
553
17.9k
        self.prioritize.clear_pending_open(store, counts);
554
17.9k
    }
555
556
647
    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
557
647
        if let Ok(next) = self.next_stream_id {
558
452
            if id >= next {
559
55
                return Err(Reason::PROTOCOL_ERROR);
560
397
            }
561
195
        }
562
        // if next_stream_id is overflowed, that's ok.
563
564
592
        Ok(())
565
647
    }
566
567
1.28M
    pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
568
1.28M
        self.next_stream_id
569
1.28M
            .map_err(|_| UserError::OverflowedStreamId)
570
1.28M
    }
571
572
5.68k
    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
573
5.68k
        if let Ok(next_id) = self.next_stream_id {
574
            // Peer::is_local_init should have been called beforehand
575
4.95k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
576
4.95k
            id < next_id
577
        } else {
578
735
            true
579
        }
580
5.68k
    }
581
582
10.1k
    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
583
10.1k
        if let Ok(next_id) = self.next_stream_id {
584
            // Peer::is_local_init should have been called beforehand
585
9.25k
            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
586
9.25k
            if id >= next_id {
587
2.71k
                self.next_stream_id = id.next_id();
588
6.54k
            }
589
872
        }
590
10.1k
    }
591
592
0
    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
593
0
        self.is_extended_connect_protocol_enabled
594
0
    }
595
}