Coverage Report

Created: 2025-11-02 06:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/neqo/neqo-transport/src/fc.rs
Line
Count
Source
1
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4
// option. This file may not be copied, modified, or distributed
5
// except according to those terms.
6
7
// Tracks possibly-redundant flow control signals from other code and converts
8
// into flow control frames needing to be sent to the remote.
9
10
use std::{
11
    cmp::min,
12
    fmt::{Debug, Display},
13
    num::NonZeroU64,
14
    ops::{Deref, DerefMut, Index, IndexMut},
15
    time::{Duration, Instant},
16
};
17
18
use neqo_common::{qdebug, qtrace, Buffer, Role, MAX_VARINT};
19
20
use crate::{
21
    frame::FrameType,
22
    packet,
23
    recovery::{self, StreamRecoveryToken},
24
    recv_stream::{MAX_CONN_RECV_WINDOW_SIZE, MAX_RECV_WINDOW_SIZE},
25
    stats::FrameStats,
26
    stream_id::{StreamId, StreamType},
27
    Error, Res,
28
};
29
30
/// Fraction of a flow control window after which a receiver sends a window
31
/// update.
32
///
33
/// In steady-state and max utilization, a value of 4 leads to 4 window updates
34
/// per RTT.
35
///
36
/// Value aligns with [`crate::connection::params::ConnectionParameters::DEFAULT_ACK_RATIO`].
37
pub const WINDOW_UPDATE_FRACTION: u64 = 4;
38
39
/// Multiplier for auto-tuning the stream receive window.
40
///
41
/// See [`ReceiverFlowControl::auto_tune`].
42
///
43
/// Note that the flow control window should grow at least as fast as the
44
/// congestion control window, in order to not unnecessarily limit throughput.
45
const WINDOW_INCREASE_MULTIPLIER: u64 = 4;
46
47
/// Subject for flow control auto-tuning, used to avoid heap allocations
48
/// when logging.
49
#[derive(Debug, Clone, Copy)]
50
enum AutoTuneSubject {
51
    Connection,
52
    Stream(StreamId),
53
}
54
55
impl Display for AutoTuneSubject {
56
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57
0
        match self {
58
0
            Self::Connection => write!(f, "connection"),
59
0
            Self::Stream(id) => write!(f, "stream {id}"),
60
        }
61
0
    }
62
}
63
64
#[derive(Debug)]
65
pub struct SenderFlowControl<T>
66
where
67
    T: Debug + Sized,
68
{
69
    /// The thing that we're counting for.
70
    subject: T,
71
    /// The limit.
72
    limit: u64,
73
    /// How much of that limit we've used.
74
    used: u64,
75
    /// The point at which blocking occurred.  This is updated each time
76
    /// the sender decides that it is blocked.  It only ever changes
77
    /// when blocking occurs.  This ensures that blocking at any given limit
78
    /// is only reported once.
79
    /// Note: All values are one greater than the corresponding `limit` to
80
    /// allow distinguishing between blocking at a limit of 0 and no blocking.
81
    blocked_at: u64,
82
    /// Whether a blocked frame should be sent.
83
    blocked_frame: bool,
84
}
85
86
impl<T> SenderFlowControl<T>
87
where
88
    T: Debug + Sized,
89
{
90
    /// Make a new instance with the initial value and subject.
91
7.51k
    pub const fn new(subject: T, initial: u64) -> Self {
92
7.51k
        Self {
93
7.51k
            subject,
94
7.51k
            limit: initial,
95
7.51k
            used: 0,
96
7.51k
            blocked_at: 0,
97
7.51k
            blocked_frame: false,
98
7.51k
        }
99
7.51k
    }
<neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::new
Line
Count
Source
91
5.00k
    pub const fn new(subject: T, initial: u64) -> Self {
92
5.00k
        Self {
93
5.00k
            subject,
94
5.00k
            limit: initial,
95
5.00k
            used: 0,
96
5.00k
            blocked_at: 0,
97
5.00k
            blocked_frame: false,
98
5.00k
        }
99
5.00k
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::new
<neqo_transport::fc::SenderFlowControl<()>>::new
Line
Count
Source
91
2.50k
    pub const fn new(subject: T, initial: u64) -> Self {
92
2.50k
        Self {
93
2.50k
            subject,
94
2.50k
            limit: initial,
95
2.50k
            used: 0,
96
2.50k
            blocked_at: 0,
97
2.50k
            blocked_frame: false,
98
2.50k
        }
99
2.50k
    }
100
101
    /// Update the maximum. Returns `Some` with the updated available flow
102
    /// control if the change was an increase and `None` otherwise.
103
1.32k
    pub fn update(&mut self, limit: u64) -> Option<usize> {
104
1.32k
        debug_assert!(limit < u64::MAX);
105
1.32k
        (limit > self.limit).then(|| {
106
1.32k
            self.limit = limit;
107
1.32k
            self.blocked_frame = false;
108
1.32k
            self.available()
109
1.32k
        })
<neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::update::{closure#0}
Line
Count
Source
105
884
        (limit > self.limit).then(|| {
106
884
            self.limit = limit;
107
884
            self.blocked_frame = false;
108
884
            self.available()
109
884
        })
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::update::{closure#0}
<neqo_transport::fc::SenderFlowControl<()>>::update::{closure#0}
Line
Count
Source
105
442
        (limit > self.limit).then(|| {
106
442
            self.limit = limit;
107
442
            self.blocked_frame = false;
108
442
            self.available()
109
442
        })
110
1.32k
    }
<neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::update
Line
Count
Source
103
884
    pub fn update(&mut self, limit: u64) -> Option<usize> {
104
884
        debug_assert!(limit < u64::MAX);
105
884
        (limit > self.limit).then(|| {
106
            self.limit = limit;
107
            self.blocked_frame = false;
108
            self.available()
109
        })
110
884
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::update
<neqo_transport::fc::SenderFlowControl<()>>::update
Line
Count
Source
103
442
    pub fn update(&mut self, limit: u64) -> Option<usize> {
104
442
        debug_assert!(limit < u64::MAX);
105
442
        (limit > self.limit).then(|| {
106
            self.limit = limit;
107
            self.blocked_frame = false;
108
            self.available()
109
        })
110
442
    }
111
112
    /// Consume flow control.
113
0
    pub fn consume(&mut self, count: usize) {
114
0
        let amt = u64::try_from(count).expect("usize fits into u64");
115
0
        debug_assert!(self.used + amt <= self.limit);
116
0
        self.used += amt;
117
0
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::consume
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::consume
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<()>>::consume
118
119
    /// Get available flow control.
120
2.21k
    pub fn available(&self) -> usize {
121
2.21k
        usize::try_from(self.limit - self.used).unwrap_or(usize::MAX)
122
2.21k
    }
<neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::available
Line
Count
Source
120
1.76k
    pub fn available(&self) -> usize {
121
1.76k
        usize::try_from(self.limit - self.used).unwrap_or(usize::MAX)
122
1.76k
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::available
<neqo_transport::fc::SenderFlowControl<()>>::available
Line
Count
Source
120
442
    pub fn available(&self) -> usize {
121
442
        usize::try_from(self.limit - self.used).unwrap_or(usize::MAX)
122
442
    }
123
124
    /// How much data has been written.
125
0
    pub const fn used(&self) -> u64 {
126
0
        self.used
127
0
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::used
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::used
128
129
    /// Mark flow control as blocked.
130
    /// This only does something if the current limit exceeds the last reported blocking limit.
131
0
    pub fn blocked(&mut self) {
132
0
        if self.limit >= self.blocked_at {
133
0
            self.blocked_at = self.limit + 1;
134
0
            self.blocked_frame = true;
135
0
        }
136
0
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::blocked
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::blocked
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<()>>::blocked
137
138
    /// Return whether a blocking frame needs to be sent.
139
    /// This is `Some` with the active limit if `blocked` has been called,
140
    /// if a blocking frame has not been sent (or it has been lost), and
141
    /// if the blocking condition remains.
142
1.32k
    fn blocked_needed(&self) -> Option<u64> {
143
1.32k
        (self.blocked_frame && self.limit < self.blocked_at).then(|| self.blocked_at - 1)
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::blocked_needed::{closure#0}
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::blocked_needed::{closure#0}
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<()>>::blocked_needed::{closure#0}
144
1.32k
    }
<neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::blocked_needed
Line
Count
Source
142
884
    fn blocked_needed(&self) -> Option<u64> {
143
884
        (self.blocked_frame && self.limit < self.blocked_at).then(|| self.blocked_at - 1)
144
884
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::blocked_needed
<neqo_transport::fc::SenderFlowControl<()>>::blocked_needed
Line
Count
Source
142
442
    fn blocked_needed(&self) -> Option<u64> {
143
442
        (self.blocked_frame && self.limit < self.blocked_at).then(|| self.blocked_at - 1)
144
442
    }
145
146
    /// Clear the need to send a blocked frame.
147
0
    fn blocked_sent(&mut self) {
148
0
        self.blocked_frame = false;
149
0
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::blocked_sent
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::blocked_sent
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<()>>::blocked_sent
150
151
    /// Mark a blocked frame as having been lost.
152
    /// Only send again if value of `self.blocked_at` hasn't increased since sending.
153
    /// That would imply that the limit has since increased.
154
0
    pub fn frame_lost(&mut self, limit: u64) {
155
0
        if self.blocked_at == limit + 1 {
156
0
            self.blocked_frame = true;
157
0
        }
158
0
    }
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamType>>::frame_lost
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<neqo_transport::stream_id::StreamId>>::frame_lost
Unexecuted instantiation: <neqo_transport::fc::SenderFlowControl<()>>::frame_lost
159
}
160
161
impl SenderFlowControl<()> {
162
442
    pub fn write_frames<B: Buffer>(
163
442
        &mut self,
164
442
        builder: &mut packet::Builder<B>,
165
442
        tokens: &mut recovery::Tokens,
166
442
        stats: &mut FrameStats,
167
442
    ) {
168
442
        if let Some(limit) = self.blocked_needed() {
169
0
            if builder.write_varint_frame(&[FrameType::DataBlocked.into(), limit]) {
170
0
                stats.data_blocked += 1;
171
0
                tokens.push(recovery::Token::Stream(StreamRecoveryToken::DataBlocked(
172
0
                    limit,
173
0
                )));
174
0
                self.blocked_sent();
175
0
            }
176
442
        }
177
442
    }
178
}
179
180
impl SenderFlowControl<StreamId> {
181
0
    pub fn write_frames<B: Buffer>(
182
0
        &mut self,
183
0
        builder: &mut packet::Builder<B>,
184
0
        tokens: &mut recovery::Tokens,
185
0
        stats: &mut FrameStats,
186
0
    ) {
187
0
        if let Some(limit) = self.blocked_needed() {
188
0
            if builder.write_varint_frame(&[
189
0
                FrameType::StreamDataBlocked.into(),
190
0
                self.subject.as_u64(),
191
0
                limit,
192
0
            ]) {
193
0
                stats.stream_data_blocked += 1;
194
0
                tokens.push(recovery::Token::Stream(
195
0
                    StreamRecoveryToken::StreamDataBlocked {
196
0
                        stream_id: self.subject,
197
0
                        limit,
198
0
                    },
199
0
                ));
200
0
                self.blocked_sent();
201
0
            }
202
0
        }
203
0
    }
204
}
205
206
impl SenderFlowControl<StreamType> {
207
884
    pub fn write_frames<B: Buffer>(
208
884
        &mut self,
209
884
        builder: &mut packet::Builder<B>,
210
884
        tokens: &mut recovery::Tokens,
211
884
        stats: &mut FrameStats,
212
884
    ) {
213
884
        if let Some(limit) = self.blocked_needed() {
214
0
            let frame = match self.subject {
215
0
                StreamType::BiDi => FrameType::StreamsBlockedBiDi,
216
0
                StreamType::UniDi => FrameType::StreamsBlockedUniDi,
217
            };
218
0
            if builder.write_varint_frame(&[frame.into(), limit]) {
219
0
                stats.streams_blocked += 1;
220
0
                tokens.push(recovery::Token::Stream(
221
0
                    StreamRecoveryToken::StreamsBlocked {
222
0
                        stream_type: self.subject,
223
0
                        limit,
224
0
                    },
225
0
                ));
226
0
                self.blocked_sent();
227
0
            }
228
884
        }
229
884
    }
230
}
231
232
#[derive(Debug, Default)]
233
pub struct ReceiverFlowControl<T>
234
where
235
    T: Debug + Sized,
236
{
237
    /// The thing that we're counting for.
238
    subject: T,
239
    /// The maximum amount of items that can be active (e.g., the size of the receive buffer).
240
    max_active: u64,
241
    /// Last max allowed sent.
242
    max_allowed: u64,
243
    /// Last time a flow control update was sent.
244
    ///
245
    /// Used by auto-tuning logic to estimate sending rate between updates.
246
    /// This is active for both stream-level
247
    /// ([`ReceiverFlowControl<StreamId>`]) and connection-level
248
    /// ([`ReceiverFlowControl<()>`]) flow control.
249
    last_update: Option<Instant>,
250
    /// Item received, but not retired yet.
251
    /// This will be used for byte flow control: each stream will remember its largest byte
252
    /// offset received and session flow control will remember the sum of all bytes consumed
253
    /// by all streams.
254
    consumed: u64,
255
    /// Retired items.
256
    retired: u64,
257
    frame_pending: bool,
258
}
259
260
impl<T> ReceiverFlowControl<T>
261
where
262
    T: Debug + Sized,
263
{
264
    /// Make a new instance with the initial value and subject.
265
7.51k
    pub const fn new(subject: T, max: u64) -> Self {
266
7.51k
        Self {
267
7.51k
            subject,
268
7.51k
            max_active: max,
269
7.51k
            max_allowed: max,
270
7.51k
            last_update: None,
271
7.51k
            consumed: 0,
272
7.51k
            retired: 0,
273
7.51k
            frame_pending: false,
274
7.51k
        }
275
7.51k
    }
<neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::new
Line
Count
Source
265
5.00k
    pub const fn new(subject: T, max: u64) -> Self {
266
5.00k
        Self {
267
5.00k
            subject,
268
5.00k
            max_active: max,
269
5.00k
            max_allowed: max,
270
5.00k
            last_update: None,
271
5.00k
            consumed: 0,
272
5.00k
            retired: 0,
273
5.00k
            frame_pending: false,
274
5.00k
        }
275
5.00k
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::new
<neqo_transport::fc::ReceiverFlowControl<()>>::new
Line
Count
Source
265
2.50k
    pub const fn new(subject: T, max: u64) -> Self {
266
2.50k
        Self {
267
2.50k
            subject,
268
2.50k
            max_active: max,
269
2.50k
            max_allowed: max,
270
2.50k
            last_update: None,
271
2.50k
            consumed: 0,
272
2.50k
            retired: 0,
273
2.50k
            frame_pending: false,
274
2.50k
        }
275
2.50k
    }
276
277
    /// Retire some items and maybe send flow control
278
    /// update.
279
0
    pub fn retire(&mut self, retired: u64) {
280
0
        if retired <= self.retired {
281
0
            return;
282
0
        }
283
284
0
        self.retired = retired;
285
0
        if self.should_send_update() {
286
0
            self.frame_pending = true;
287
0
        }
288
0
    }
289
290
    /// This function is called when `STREAM_DATA_BLOCKED` frame is received.
291
    /// The flow control will try to send an update if possible.
292
0
    pub fn send_flowc_update(&mut self) {
293
0
        if self.retired + self.max_active > self.max_allowed {
294
0
            self.frame_pending = true;
295
0
        }
296
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::send_flowc_update
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::send_flowc_update
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::send_flowc_update
297
298
0
    const fn should_send_update(&self) -> bool {
299
0
        let window_bytes_unused = self.max_allowed - self.retired;
300
0
        window_bytes_unused < self.max_active - self.max_active / WINDOW_UPDATE_FRACTION
301
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::should_send_update
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::should_send_update
302
303
1.32k
    pub const fn frame_needed(&self) -> bool {
304
1.32k
        self.frame_pending
305
1.32k
    }
<neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::frame_needed
Line
Count
Source
303
884
    pub const fn frame_needed(&self) -> bool {
304
884
        self.frame_pending
305
884
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::frame_needed
<neqo_transport::fc::ReceiverFlowControl<()>>::frame_needed
Line
Count
Source
303
442
    pub const fn frame_needed(&self) -> bool {
304
442
        self.frame_pending
305
442
    }
306
307
0
    pub fn next_limit(&self) -> u64 {
308
0
        min(
309
0
            self.retired + self.max_active,
310
            // Flow control limits are encoded as QUIC varints and are thus
311
            // limited to the maximum QUIC varint value.
312
            MAX_VARINT,
313
        )
314
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::next_limit
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::next_limit
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::next_limit
315
316
0
    pub const fn max_active(&self) -> u64 {
317
0
        self.max_active
318
0
    }
319
320
0
    pub fn frame_lost(&mut self, maximum_data: u64) {
321
0
        if maximum_data == self.max_allowed {
322
0
            self.frame_pending = true;
323
0
        }
324
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::frame_lost
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::frame_lost
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::frame_lost
325
326
0
    fn frame_sent(&mut self, new_max: u64) {
327
0
        self.max_allowed = new_max;
328
0
        self.frame_pending = false;
329
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamType>>::frame_sent
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::frame_sent
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::frame_sent
330
331
0
    pub fn set_max_active(&mut self, max: u64) {
332
        // If max_active has been increased, send an update immediately.
333
0
        self.frame_pending |= self.max_active < max;
334
0
        self.max_active = max;
335
0
    }
336
337
0
    pub const fn retired(&self) -> u64 {
338
0
        self.retired
339
0
    }
340
341
0
    pub const fn consumed(&self) -> u64 {
342
0
        self.consumed
343
0
    }
344
345
    /// Core auto-tuning logic for adjusting the maximum flow control window.
346
    ///
347
    /// This method is called by both connection-level and stream-level
348
    /// implementations. It increases `max_active` when the sending rate exceeds
349
    /// what the current window and RTT would allow, capping at `max_window`.
350
0
    fn auto_tune_inner(
351
0
        &mut self,
352
0
        now: Instant,
353
0
        rtt: Duration,
354
0
        max_window: u64,
355
0
        subject: AutoTuneSubject,
356
0
    ) {
357
0
        let Some(max_allowed_sent_at) = self.last_update else {
358
0
            return;
359
        };
360
361
0
        let Ok(elapsed): Result<u64, _> = now
362
0
            .duration_since(max_allowed_sent_at)
363
0
            .as_micros()
364
0
            .try_into()
365
        else {
366
0
            return;
367
        };
368
369
0
        let Ok(rtt): Result<NonZeroU64, _> = rtt
370
0
            .as_micros()
371
0
            .try_into()
372
0
            .and_then(|rtt: u64| NonZeroU64::try_from(rtt))
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::auto_tune_inner::{closure#0}
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::auto_tune_inner::{closure#0}
373
        else {
374
            // RTT is zero, no need for tuning.
375
0
            return;
376
        };
377
378
        // Compute the amount of bytes we have received in excess
379
        // of what `max_active` might allow.
380
0
        let window_bytes_expected = self.max_active * elapsed / rtt;
381
0
        let window_bytes_used = self.max_active - (self.max_allowed - self.retired);
382
0
        let Some(excess) = window_bytes_used.checked_sub(window_bytes_expected) else {
383
            // Used below expected. No auto-tuning needed.
384
0
            return;
385
        };
386
387
0
        let prev_max_active = self.max_active;
388
0
        self.max_active = min(
389
0
            self.max_active + excess * WINDOW_INCREASE_MULTIPLIER,
390
0
            max_window,
391
0
        );
392
393
0
        let increase = self.max_active - prev_max_active;
394
0
        if increase > 0 {
395
0
            qdebug!(
396
0
                "Increasing max {subject} receive window by {} B, \
397
0
                previous max_active: {} MiB, \
398
0
                new max_active: {} MiB, \
399
0
                last update: {:?}, \
400
0
                rtt: {rtt:?}",
401
                increase,
402
0
                prev_max_active / 1024 / 1024,
403
0
                self.max_active / 1024 / 1024,
404
0
                now - max_allowed_sent_at,
405
            );
406
0
        }
407
0
    }
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<neqo_transport::stream_id::StreamId>>::auto_tune_inner
Unexecuted instantiation: <neqo_transport::fc::ReceiverFlowControl<()>>::auto_tune_inner
408
}
409
410
impl ReceiverFlowControl<()> {
411
442
    pub fn write_frames<B: Buffer>(
412
442
        &mut self,
413
442
        builder: &mut packet::Builder<B>,
414
442
        tokens: &mut recovery::Tokens,
415
442
        stats: &mut FrameStats,
416
442
        now: Instant,
417
442
        rtt: Duration,
418
442
    ) {
419
442
        if !self.frame_needed() {
420
442
            return;
421
0
        }
422
423
0
        self.auto_tune(now, rtt);
424
425
0
        let max_allowed = self.next_limit();
426
0
        if builder.write_varint_frame(&[FrameType::MaxData.into(), max_allowed]) {
427
0
            stats.max_data += 1;
428
0
            tokens.push(recovery::Token::Stream(StreamRecoveryToken::MaxData(
429
0
                max_allowed,
430
0
            )));
431
0
            self.frame_sent(max_allowed);
432
0
            self.last_update = Some(now);
433
0
        }
434
442
    }
435
436
    /// Auto-tune [`ReceiverFlowControl::max_active`], i.e. the connection flow
437
    /// control window.
438
    ///
439
    /// If the sending rate (`window_bytes_used`) exceeds the rate allowed by
440
    /// the maximum flow control window and the current rtt
441
    /// (`window_bytes_expected`), try to increase the maximum flow control
442
    /// window ([`ReceiverFlowControl::max_active`]).
443
0
    fn auto_tune(&mut self, now: Instant, rtt: Duration) {
444
0
        self.auto_tune_inner(
445
0
            now,
446
0
            rtt,
447
            MAX_CONN_RECV_WINDOW_SIZE,
448
0
            AutoTuneSubject::Connection,
449
        );
450
0
    }
451
452
0
    pub fn add_retired(&mut self, count: u64) {
453
0
        debug_assert!(self.retired + count <= self.consumed);
454
0
        self.retired += count;
455
0
        if self.should_send_update() {
456
0
            self.frame_pending = true;
457
0
        }
458
0
    }
459
460
0
    pub fn consume(&mut self, count: u64) -> Res<()> {
461
0
        if self.consumed + count > self.max_allowed {
462
0
            qtrace!(
463
0
                "Session RX window exceeded: consumed:{} new:{count} limit:{}",
464
                self.consumed,
465
                self.max_allowed
466
            );
467
0
            return Err(Error::FlowControl);
468
0
        }
469
0
        self.consumed += count;
470
0
        Ok(())
471
0
    }
472
}
473
474
impl ReceiverFlowControl<StreamId> {
475
0
    pub fn write_frames<B: Buffer>(
476
0
        &mut self,
477
0
        builder: &mut packet::Builder<B>,
478
0
        tokens: &mut recovery::Tokens,
479
0
        stats: &mut FrameStats,
480
0
        now: Instant,
481
0
        rtt: Duration,
482
0
    ) {
483
0
        if !self.frame_needed() {
484
0
            return;
485
0
        }
486
487
0
        self.auto_tune(now, rtt);
488
489
0
        let max_allowed = self.next_limit();
490
0
        if builder.write_varint_frame(&[
491
0
            FrameType::MaxStreamData.into(),
492
0
            self.subject.as_u64(),
493
0
            max_allowed,
494
0
        ]) {
495
0
            stats.max_stream_data += 1;
496
0
            tokens.push(recovery::Token::Stream(
497
0
                StreamRecoveryToken::MaxStreamData {
498
0
                    stream_id: self.subject,
499
0
                    max_data: max_allowed,
500
0
                },
501
0
            ));
502
0
            self.frame_sent(max_allowed);
503
0
            self.last_update = Some(now);
504
0
        }
505
0
    }
506
507
    /// Auto-tune [`ReceiverFlowControl::max_active`], i.e. the stream flow
508
    /// control window.
509
    ///
510
    /// If the sending rate (`window_bytes_used`) exceeds the rate allowed by
511
    /// the maximum flow control window and the current rtt
512
    /// (`window_bytes_expected`), try to increase the maximum flow control
513
    /// window ([`ReceiverFlowControl::max_active`]).
514
0
    fn auto_tune(&mut self, now: Instant, rtt: Duration) {
515
0
        self.auto_tune_inner(
516
0
            now,
517
0
            rtt,
518
            MAX_RECV_WINDOW_SIZE,
519
0
            AutoTuneSubject::Stream(self.subject),
520
        );
521
0
    }
522
523
0
    pub fn add_retired(&mut self, count: u64) {
524
0
        debug_assert!(self.retired + count <= self.consumed);
525
0
        self.retired += count;
526
0
        if self.should_send_update() {
527
0
            self.frame_pending = true;
528
0
        }
529
0
    }
530
531
0
    pub fn set_consumed(&mut self, consumed: u64) -> Res<u64> {
532
0
        if consumed <= self.consumed {
533
0
            return Ok(0);
534
0
        }
535
536
0
        if consumed > self.max_allowed {
537
0
            qtrace!("Stream RX window exceeded: {consumed}");
538
0
            return Err(Error::FlowControl);
539
0
        }
540
0
        let new_consumed = consumed - self.consumed;
541
0
        self.consumed = consumed;
542
0
        Ok(new_consumed)
543
0
    }
544
}
545
546
impl ReceiverFlowControl<StreamType> {
547
884
    pub fn write_frames<B: Buffer>(
548
884
        &mut self,
549
884
        builder: &mut packet::Builder<B>,
550
884
        tokens: &mut recovery::Tokens,
551
884
        stats: &mut FrameStats,
552
884
    ) {
553
884
        if !self.frame_needed() {
554
884
            return;
555
0
        }
556
0
        let max_streams = self.next_limit();
557
0
        let frame = match self.subject {
558
0
            StreamType::BiDi => FrameType::MaxStreamsBiDi,
559
0
            StreamType::UniDi => FrameType::MaxStreamsUniDi,
560
        };
561
0
        if builder.write_varint_frame(&[frame.into(), max_streams]) {
562
0
            stats.max_streams += 1;
563
0
            tokens.push(recovery::Token::Stream(StreamRecoveryToken::MaxStreams {
564
0
                stream_type: self.subject,
565
0
                max_streams,
566
0
            }));
567
0
            self.frame_sent(max_streams);
568
0
        }
569
884
    }
570
571
    /// Check if received item exceeds the allowed flow control limit.
572
0
    pub const fn check_allowed(&self, new_end: u64) -> bool {
573
0
        new_end < self.max_allowed
574
0
    }
575
576
    /// Retire given amount of additional data.
577
    /// This function will send flow updates immediately.
578
884
    pub fn add_retired(&mut self, count: u64) {
579
884
        self.retired += count;
580
884
        if count > 0 {
581
0
            self.send_flowc_update();
582
884
        }
583
884
    }
584
}
585
586
pub struct RemoteStreamLimit {
587
    streams_fc: ReceiverFlowControl<StreamType>,
588
    next_stream: StreamId,
589
}
590
591
impl RemoteStreamLimit {
592
5.00k
    pub const fn new(stream_type: StreamType, max_streams: u64, role: Role) -> Self {
593
5.00k
        Self {
594
5.00k
            streams_fc: ReceiverFlowControl::new(stream_type, max_streams),
595
5.00k
            // // This is for a stream created by a peer, therefore we use role.remote().
596
5.00k
            next_stream: StreamId::init(stream_type, role.remote()),
597
5.00k
        }
598
5.00k
    }
599
600
0
    pub const fn is_allowed(&self, stream_id: StreamId) -> bool {
601
0
        let stream_idx = stream_id.as_u64() >> 2;
602
0
        self.streams_fc.check_allowed(stream_idx)
603
0
    }
604
605
0
    pub fn is_new_stream(&self, stream_id: StreamId) -> Res<bool> {
606
0
        if !self.is_allowed(stream_id) {
607
0
            return Err(Error::StreamLimit);
608
0
        }
609
0
        Ok(stream_id >= self.next_stream)
610
0
    }
611
612
0
    pub fn take_stream_id(&mut self) -> StreamId {
613
0
        let new_stream = self.next_stream;
614
0
        self.next_stream.next();
615
0
        assert!(self.is_allowed(new_stream));
616
0
        new_stream
617
0
    }
618
}
619
620
impl Deref for RemoteStreamLimit {
621
    type Target = ReceiverFlowControl<StreamType>;
622
0
    fn deref(&self) -> &Self::Target {
623
0
        &self.streams_fc
624
0
    }
625
}
626
627
impl DerefMut for RemoteStreamLimit {
628
1.76k
    fn deref_mut(&mut self) -> &mut Self::Target {
629
1.76k
        &mut self.streams_fc
630
1.76k
    }
631
}
632
633
pub struct RemoteStreamLimits {
634
    bidirectional: RemoteStreamLimit,
635
    unidirectional: RemoteStreamLimit,
636
}
637
638
impl RemoteStreamLimits {
639
2.50k
    pub const fn new(local_max_stream_bidi: u64, local_max_stream_uni: u64, role: Role) -> Self {
640
2.50k
        Self {
641
2.50k
            bidirectional: RemoteStreamLimit::new(StreamType::BiDi, local_max_stream_bidi, role),
642
2.50k
            unidirectional: RemoteStreamLimit::new(StreamType::UniDi, local_max_stream_uni, role),
643
2.50k
        }
644
2.50k
    }
645
}
646
647
impl Index<StreamType> for RemoteStreamLimits {
648
    type Output = RemoteStreamLimit;
649
650
0
    fn index(&self, index: StreamType) -> &Self::Output {
651
0
        match index {
652
0
            StreamType::BiDi => &self.bidirectional,
653
0
            StreamType::UniDi => &self.unidirectional,
654
        }
655
0
    }
656
}
657
658
impl IndexMut<StreamType> for RemoteStreamLimits {
659
1.76k
    fn index_mut(&mut self, index: StreamType) -> &mut Self::Output {
660
1.76k
        match index {
661
884
            StreamType::BiDi => &mut self.bidirectional,
662
884
            StreamType::UniDi => &mut self.unidirectional,
663
        }
664
1.76k
    }
665
}
666
667
pub struct LocalStreamLimits {
668
    bidirectional: SenderFlowControl<StreamType>,
669
    unidirectional: SenderFlowControl<StreamType>,
670
    role_bit: u64,
671
}
672
673
impl LocalStreamLimits {
674
2.50k
    pub const fn new(role: Role) -> Self {
675
2.50k
        Self {
676
2.50k
            bidirectional: SenderFlowControl::new(StreamType::BiDi, 0),
677
2.50k
            unidirectional: SenderFlowControl::new(StreamType::UniDi, 0),
678
2.50k
            role_bit: StreamId::role_bit(role),
679
2.50k
        }
680
2.50k
    }
681
682
0
    pub fn take_stream_id(&mut self, stream_type: StreamType) -> Option<StreamId> {
683
0
        let fc = match stream_type {
684
0
            StreamType::BiDi => &mut self.bidirectional,
685
0
            StreamType::UniDi => &mut self.unidirectional,
686
        };
687
0
        if fc.available() > 0 {
688
0
            let new_stream = fc.used();
689
0
            fc.consume(1);
690
0
            let type_bit = match stream_type {
691
0
                StreamType::BiDi => 0,
692
0
                StreamType::UniDi => 2,
693
            };
694
0
            Some(StreamId::from((new_stream << 2) + type_bit + self.role_bit))
695
        } else {
696
0
            fc.blocked();
697
0
            None
698
        }
699
0
    }
700
}
701
702
impl Index<StreamType> for LocalStreamLimits {
703
    type Output = SenderFlowControl<StreamType>;
704
705
884
    fn index(&self, index: StreamType) -> &Self::Output {
706
884
        match index {
707
442
            StreamType::BiDi => &self.bidirectional,
708
442
            StreamType::UniDi => &self.unidirectional,
709
        }
710
884
    }
711
}
712
713
impl IndexMut<StreamType> for LocalStreamLimits {
714
1.76k
    fn index_mut(&mut self, index: StreamType) -> &mut Self::Output {
715
1.76k
        match index {
716
884
            StreamType::BiDi => &mut self.bidirectional,
717
884
            StreamType::UniDi => &mut self.unidirectional,
718
        }
719
1.76k
    }
720
}
721
722
#[cfg(test)]
723
#[cfg_attr(coverage_nightly, coverage(off))]
724
mod test {
725
    #![allow(
726
        clippy::allow_attributes,
727
        clippy::unwrap_in_result,
728
        reason = "OK in tests."
729
    )]
730
731
    use std::{
732
        cmp::min,
733
        collections::VecDeque,
734
        time::{Duration, Instant},
735
    };
736
737
    use neqo_common::{qdebug, Encoder, Role};
738
    use neqo_crypto::random;
739
740
    use super::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl};
741
    use crate::{
742
        fc::WINDOW_UPDATE_FRACTION,
743
        packet::{self, PACKET_LIMIT},
744
        recovery,
745
        recv_stream::{MAX_CONN_RECV_WINDOW_SIZE, MAX_RECV_WINDOW_SIZE},
746
        stats::FrameStats,
747
        stream_id::{StreamId, StreamType},
748
        ConnectionParameters, Error, Res, INITIAL_STREAM_RECV_WINDOW_SIZE,
749
    };
750
751
    #[test]
752
    fn blocked_at_zero() {
753
        let mut fc = SenderFlowControl::new((), 0);
754
        fc.blocked();
755
        assert_eq!(fc.blocked_needed(), Some(0));
756
    }
757
758
    #[test]
759
    fn blocked() {
760
        let mut fc = SenderFlowControl::new((), 10);
761
        fc.blocked();
762
        assert_eq!(fc.blocked_needed(), Some(10));
763
    }
764
765
    #[test]
766
    fn update_consume() {
767
        let mut fc = SenderFlowControl::new((), 10);
768
        fc.consume(10);
769
        assert_eq!(fc.available(), 0);
770
        fc.update(5); // An update lower than the current limit does nothing.
771
        assert_eq!(fc.available(), 0);
772
        fc.update(15);
773
        assert_eq!(fc.available(), 5);
774
        fc.consume(3);
775
        assert_eq!(fc.available(), 2);
776
    }
777
778
    #[test]
779
    fn update_clears_blocked() {
780
        let mut fc = SenderFlowControl::new((), 10);
781
        fc.blocked();
782
        assert_eq!(fc.blocked_needed(), Some(10));
783
        fc.update(5); // An update lower than the current limit does nothing.
784
        assert_eq!(fc.blocked_needed(), Some(10));
785
        fc.update(11);
786
        assert_eq!(fc.blocked_needed(), None);
787
    }
788
789
    #[test]
790
    fn lost_blocked_resent() {
791
        let mut fc = SenderFlowControl::new((), 10);
792
        fc.blocked();
793
        fc.blocked_sent();
794
        assert_eq!(fc.blocked_needed(), None);
795
        fc.frame_lost(10);
796
        assert_eq!(fc.blocked_needed(), Some(10));
797
    }
798
799
    #[test]
800
    fn lost_after_increase() {
801
        let mut fc = SenderFlowControl::new((), 10);
802
        fc.blocked();
803
        fc.blocked_sent();
804
        assert_eq!(fc.blocked_needed(), None);
805
        fc.update(11);
806
        fc.frame_lost(10);
807
        assert_eq!(fc.blocked_needed(), None);
808
    }
809
810
    #[test]
811
    fn lost_after_higher_blocked() {
812
        let mut fc = SenderFlowControl::new((), 10);
813
        fc.blocked();
814
        fc.blocked_sent();
815
        fc.update(11);
816
        fc.blocked();
817
        assert_eq!(fc.blocked_needed(), Some(11));
818
        fc.blocked_sent();
819
        fc.frame_lost(10);
820
        assert_eq!(fc.blocked_needed(), None);
821
    }
822
823
    #[test]
824
    fn do_no_need_max_allowed_frame_at_start() {
825
        let fc = ReceiverFlowControl::new((), 0);
826
        assert!(!fc.frame_needed());
827
    }
828
829
    #[test]
830
    fn max_allowed_after_items_retired() {
831
        let window = 100;
832
        let trigger = window / WINDOW_UPDATE_FRACTION;
833
        let mut fc = ReceiverFlowControl::new((), window);
834
        fc.retire(trigger);
835
        assert!(!fc.frame_needed());
836
        fc.retire(trigger + 1);
837
        assert!(fc.frame_needed());
838
        assert_eq!(fc.next_limit(), window + trigger + 1);
839
    }
840
841
    #[test]
842
    fn need_max_allowed_frame_after_loss() {
843
        let mut fc = ReceiverFlowControl::new((), 100);
844
        fc.retire(100);
845
        assert!(fc.frame_needed());
846
        assert_eq!(fc.next_limit(), 200);
847
        fc.frame_sent(200);
848
        assert!(!fc.frame_needed());
849
        fc.frame_lost(200);
850
        assert!(fc.frame_needed());
851
        assert_eq!(fc.next_limit(), 200);
852
    }
853
854
    #[test]
855
    fn no_max_allowed_frame_after_old_loss() {
856
        let mut fc = ReceiverFlowControl::new((), 100);
857
        fc.retire(51);
858
        assert!(fc.frame_needed());
859
        assert_eq!(fc.next_limit(), 151);
860
        fc.frame_sent(151);
861
        assert!(!fc.frame_needed());
862
        fc.retire(102);
863
        assert!(fc.frame_needed());
864
        assert_eq!(fc.next_limit(), 202);
865
        fc.frame_sent(202);
866
        assert!(!fc.frame_needed());
867
        fc.frame_lost(151);
868
        assert!(!fc.frame_needed());
869
    }
870
871
    #[test]
872
    fn force_send_max_allowed() {
873
        let mut fc = ReceiverFlowControl::new((), 100);
874
        fc.retire(10);
875
        assert!(!fc.frame_needed());
876
    }
877
878
    #[test]
879
    fn multiple_retries_after_frame_pending_is_set() {
880
        let mut fc = ReceiverFlowControl::new((), 100);
881
        fc.retire(51);
882
        assert!(fc.frame_needed());
883
        assert_eq!(fc.next_limit(), 151);
884
        fc.retire(61);
885
        assert!(fc.frame_needed());
886
        assert_eq!(fc.next_limit(), 161);
887
        fc.retire(88);
888
        assert!(fc.frame_needed());
889
        assert_eq!(fc.next_limit(), 188);
890
        fc.retire(90);
891
        assert!(fc.frame_needed());
892
        assert_eq!(fc.next_limit(), 190);
893
        fc.frame_sent(190);
894
        assert!(!fc.frame_needed());
895
        fc.retire(141);
896
        assert!(fc.frame_needed());
897
        assert_eq!(fc.next_limit(), 241);
898
        fc.frame_sent(241);
899
        assert!(!fc.frame_needed());
900
    }
901
902
    #[test]
903
    fn new_retired_before_loss() {
904
        let mut fc = ReceiverFlowControl::new((), 100);
905
        fc.retire(51);
906
        assert!(fc.frame_needed());
907
        assert_eq!(fc.next_limit(), 151);
908
        fc.frame_sent(151);
909
        assert!(!fc.frame_needed());
910
        fc.retire(62);
911
        assert!(!fc.frame_needed());
912
        fc.frame_lost(151);
913
        assert!(fc.frame_needed());
914
        assert_eq!(fc.next_limit(), 162);
915
    }
916
917
    #[test]
918
    fn changing_max_active() {
919
        let mut fc = ReceiverFlowControl::new((), 100);
920
        fc.set_max_active(50);
921
922
        // There is no MAX_STREAM_DATA frame needed.
923
        assert!(!fc.frame_needed());
924
925
        // We can still retire more than 50.
926
        fc.consume(60).unwrap();
927
        fc.retire(60);
928
929
        // There is no MAX_STREAM_DATA frame needed yet.
930
        assert!(!fc.frame_needed());
931
        fc.consume(16).unwrap();
932
        fc.retire(76);
933
        assert!(fc.frame_needed());
934
        assert_eq!(fc.next_limit(), 126);
935
936
        // Increase max_active.
937
        fc.set_max_active(60);
938
        assert!(fc.frame_needed());
939
        let new_max = fc.next_limit();
940
        assert_eq!(new_max, 136);
941
942
        // Sent update, accounting for the new `max_active`.
943
        fc.frame_sent(new_max);
944
945
        // We can retire more than 60.
946
        fc.consume(60).unwrap();
947
        fc.retire(136);
948
        assert!(fc.frame_needed());
949
        assert_eq!(fc.next_limit(), 196);
950
    }
951
952
    fn remote_stream_limits(role: Role, bidi: u64, unidi: u64) {
953
        let mut fc = RemoteStreamLimits::new(2, 1, role);
954
        assert!(fc[StreamType::BiDi]
955
            .is_new_stream(StreamId::from(bidi))
956
            .unwrap());
957
        assert!(fc[StreamType::BiDi]
958
            .is_new_stream(StreamId::from(bidi + 4))
959
            .unwrap());
960
        assert!(fc[StreamType::UniDi]
961
            .is_new_stream(StreamId::from(unidi))
962
            .unwrap());
963
964
        // Exceed limits
965
        assert_eq!(
966
            fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 8)),
967
            Err(Error::StreamLimit)
968
        );
969
        assert_eq!(
970
            fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 4)),
971
            Err(Error::StreamLimit)
972
        );
973
974
        assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(bidi));
975
        assert_eq!(
976
            fc[StreamType::BiDi].take_stream_id(),
977
            StreamId::from(bidi + 4)
978
        );
979
        assert_eq!(
980
            fc[StreamType::UniDi].take_stream_id(),
981
            StreamId::from(unidi)
982
        );
983
984
        fc[StreamType::BiDi].add_retired(1);
985
        fc[StreamType::BiDi].send_flowc_update();
986
        // consume the frame
987
        let mut builder =
988
            packet::Builder::short(Encoder::new(), false, None::<&[u8]>, PACKET_LIMIT);
989
        let mut tokens = recovery::Tokens::new();
990
        fc[StreamType::BiDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default());
991
        assert_eq!(tokens.len(), 1);
992
993
        // Now 9 can be a new StreamId.
994
        assert!(fc[StreamType::BiDi]
995
            .is_new_stream(StreamId::from(bidi + 8))
996
            .unwrap());
997
        assert_eq!(
998
            fc[StreamType::BiDi].take_stream_id(),
999
            StreamId::from(bidi + 8)
1000
        );
1001
        // 13 still exceeds limits
1002
        assert_eq!(
1003
            fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 12)),
1004
            Err(Error::StreamLimit)
1005
        );
1006
1007
        fc[StreamType::UniDi].add_retired(1);
1008
        fc[StreamType::UniDi].send_flowc_update();
1009
        // consume the frame
1010
        fc[StreamType::UniDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default());
1011
        assert_eq!(tokens.len(), 2);
1012
1013
        // Now 7 can be a new StreamId.
1014
        assert!(fc[StreamType::UniDi]
1015
            .is_new_stream(StreamId::from(unidi + 4))
1016
            .unwrap());
1017
        assert_eq!(
1018
            fc[StreamType::UniDi].take_stream_id(),
1019
            StreamId::from(unidi + 4)
1020
        );
1021
        // 11 exceeds limits
1022
        assert_eq!(
1023
            fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 8)),
1024
            Err(Error::StreamLimit)
1025
        );
1026
    }
1027
1028
    #[test]
1029
    fn remote_stream_limits_new_stream_client() {
1030
        remote_stream_limits(Role::Client, 1, 3);
1031
    }
1032
1033
    #[test]
1034
    fn remote_stream_limits_new_stream_server() {
1035
        remote_stream_limits(Role::Server, 0, 2);
1036
    }
1037
1038
    #[should_panic(expected = ".is_allowed")]
1039
    #[test]
1040
    fn remote_stream_limits_asserts_if_limit_exceeded() {
1041
        let mut fc = RemoteStreamLimits::new(2, 1, Role::Client);
1042
        assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(1));
1043
        assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(5));
1044
        _ = fc[StreamType::BiDi].take_stream_id();
1045
    }
1046
1047
    fn local_stream_limits(role: Role, bidi: u64, unidi: u64) {
1048
        let mut fc = LocalStreamLimits::new(role);
1049
1050
        fc[StreamType::BiDi].update(2);
1051
        fc[StreamType::UniDi].update(1);
1052
1053
        // Add streams
1054
        assert_eq!(
1055
            fc.take_stream_id(StreamType::BiDi).unwrap(),
1056
            StreamId::from(bidi)
1057
        );
1058
        assert_eq!(
1059
            fc.take_stream_id(StreamType::BiDi).unwrap(),
1060
            StreamId::from(bidi + 4)
1061
        );
1062
        assert_eq!(fc.take_stream_id(StreamType::BiDi), None);
1063
        assert_eq!(
1064
            fc.take_stream_id(StreamType::UniDi).unwrap(),
1065
            StreamId::from(unidi)
1066
        );
1067
        assert_eq!(fc.take_stream_id(StreamType::UniDi), None);
1068
1069
        // Increase limit
1070
        fc[StreamType::BiDi].update(3);
1071
        fc[StreamType::UniDi].update(2);
1072
        assert_eq!(
1073
            fc.take_stream_id(StreamType::BiDi).unwrap(),
1074
            StreamId::from(bidi + 8)
1075
        );
1076
        assert_eq!(fc.take_stream_id(StreamType::BiDi), None);
1077
        assert_eq!(
1078
            fc.take_stream_id(StreamType::UniDi).unwrap(),
1079
            StreamId::from(unidi + 4)
1080
        );
1081
        assert_eq!(fc.take_stream_id(StreamType::UniDi), None);
1082
    }
1083
1084
    #[test]
1085
    fn local_stream_limits_new_stream_client() {
1086
        local_stream_limits(Role::Client, 0, 2);
1087
    }
1088
1089
    #[test]
1090
    fn local_stream_limits_new_stream_server() {
1091
        local_stream_limits(Role::Server, 1, 3);
1092
    }
1093
1094
    fn write_frames(fc: &mut ReceiverFlowControl<StreamId>, rtt: Duration, now: Instant) -> usize {
1095
        let mut builder =
1096
            packet::Builder::short(Encoder::new(), false, None::<&[u8]>, PACKET_LIMIT);
1097
        let mut tokens = recovery::Tokens::new();
1098
        fc.write_frames(
1099
            &mut builder,
1100
            &mut tokens,
1101
            &mut FrameStats::default(),
1102
            now,
1103
            rtt,
1104
        );
1105
        tokens.len()
1106
    }
1107
1108
    #[test]
1109
    fn trigger_factor() -> Res<()> {
1110
        let rtt = Duration::from_millis(40);
1111
        let now = test_fixture::now();
1112
        let mut fc =
1113
            ReceiverFlowControl::new(StreamId::new(0), INITIAL_STREAM_RECV_WINDOW_SIZE as u64);
1114
1115
        let fraction = INITIAL_STREAM_RECV_WINDOW_SIZE as u64 / WINDOW_UPDATE_FRACTION;
1116
1117
        let consumed = fc.set_consumed(fraction)?;
1118
        fc.add_retired(consumed);
1119
        assert_eq!(write_frames(&mut fc, rtt, now), 0);
1120
1121
        let consumed = fc.set_consumed(fraction + 1)?;
1122
        assert_eq!(write_frames(&mut fc, rtt, now), 0);
1123
1124
        fc.add_retired(consumed);
1125
        assert_eq!(write_frames(&mut fc, rtt, now), 1);
1126
1127
        Ok(())
1128
    }
1129
1130
    #[test]
1131
    fn auto_tuning_increase_no_decrease() -> Res<()> {
1132
        let rtt = Duration::from_millis(40);
1133
        let mut now = test_fixture::now();
1134
        let mut fc =
1135
            ReceiverFlowControl::new(StreamId::new(0), INITIAL_STREAM_RECV_WINDOW_SIZE as u64);
1136
        let initial_max_active = fc.max_active();
1137
1138
        // Consume and retire multiple receive windows without increasing time.
1139
        for _ in 1..11 {
1140
            let consumed = fc.set_consumed(fc.next_limit())?;
1141
            fc.add_retired(consumed);
1142
            write_frames(&mut fc, rtt, now);
1143
        }
1144
        let increased_max_active = fc.max_active();
1145
1146
        assert!(
1147
            initial_max_active < increased_max_active,
1148
            "expect receive window auto-tuning to increase max_active on full utilization of high bdp connection"
1149
        );
1150
1151
        // Huge idle time.
1152
        now += Duration::from_secs(60 * 60); // 1h
1153
        let consumed = fc.set_consumed(fc.next_limit()).unwrap();
1154
        fc.add_retired(consumed);
1155
1156
        assert_eq!(write_frames(&mut fc, rtt, now), 1);
1157
        assert_eq!(
1158
            increased_max_active,
1159
            fc.max_active(),
1160
            "expect receive window auto-tuning never to decrease max_active on low utilization"
1161
        );
1162
1163
        Ok(())
1164
    }
1165
1166
    #[test]
1167
    fn stream_data_blocked_triggers_auto_tuning() -> Res<()> {
1168
        let rtt = Duration::from_millis(40);
1169
        let now = test_fixture::now();
1170
        let mut fc =
1171
            ReceiverFlowControl::new(StreamId::new(0), INITIAL_STREAM_RECV_WINDOW_SIZE as u64);
1172
1173
        // Send first window update to give auto-tuning algorithm a baseline.
1174
        let consumed = fc.set_consumed(fc.next_limit())?;
1175
        fc.add_retired(consumed);
1176
        assert_eq!(write_frames(&mut fc, rtt, now), 1);
1177
1178
        // Use up a single byte only, i.e. way below WINDOW_UPDATE_FRACTION.
1179
        let consumed = fc.set_consumed(fc.retired + 1)?;
1180
        fc.add_retired(consumed);
1181
        assert_eq!(
1182
            write_frames(&mut fc, rtt, now),
1183
            0,
1184
            "expect receiver to not send window update unprompted"
1185
        );
1186
1187
        // Receive STREAM_DATA_BLOCKED frame.
1188
        fc.send_flowc_update();
1189
        let previous_max_active = fc.max_active();
1190
        assert_eq!(
1191
            write_frames(&mut fc, rtt, now),
1192
            1,
1193
            "expect receiver to send window update"
1194
        );
1195
        assert!(
1196
            previous_max_active < fc.max_active(),
1197
            "expect receiver to auto-tune (i.e. increase) max_active"
1198
        );
1199
1200
        Ok(())
1201
    }
1202
1203
    #[expect(clippy::cast_precision_loss, reason = "This is test code.")]
1204
    #[test]
1205
    fn auto_tuning_approximates_bandwidth_delay_product() -> Res<()> {
1206
        const DATA_FRAME_SIZE: u64 = 1_500;
1207
        /// Allow auto-tuning algorithm to be off from actual bandwidth-delay
1208
        /// product by up to 1KiB.
1209
        const TOLERANCE: u64 = 1024;
1210
1211
        test_fixture::fixture_init();
1212
1213
        // Run multiple iterations with randomized bandwidth and rtt.
1214
        for _ in 0..100 {
1215
            // Random bandwidth between 1 Mbit/s and 1 Gbit/s.
1216
            let bandwidth =
1217
                u64::from(u16::from_be_bytes(random::<2>()) % 1_000 + 1) * 1_000 * 1_000;
1218
            // Random delay between 1 ms and 256 ms.
1219
            let rtt = Duration::from_millis(u64::from(random::<1>()[0]) + 1);
1220
            let bdp = bandwidth * u64::try_from(rtt.as_millis()).unwrap() / 1_000 / 8;
1221
1222
            let mut now = test_fixture::now();
1223
1224
            let mut send_to_recv = VecDeque::new();
1225
            let mut recv_to_send = VecDeque::new();
1226
1227
            let mut last_max_active = INITIAL_STREAM_RECV_WINDOW_SIZE as u64;
1228
            let mut last_max_active_changed = now;
1229
1230
            let mut sender_window = INITIAL_STREAM_RECV_WINDOW_SIZE as u64;
1231
            let mut fc =
1232
                ReceiverFlowControl::new(StreamId::new(0), INITIAL_STREAM_RECV_WINDOW_SIZE as u64);
1233
1234
            loop {
1235
                // Sender receives window updates.
1236
                if recv_to_send.front().is_some_and(|(at, _)| *at <= now) {
1237
                    let (_, update) = recv_to_send.pop_front().unwrap();
1238
                    sender_window += update;
1239
                }
1240
1241
                // Sender sends data frames.
1242
                let sender_progressed = if sender_window > 0 {
1243
                    let to_send = min(DATA_FRAME_SIZE, sender_window);
1244
                    send_to_recv.push_back((now, to_send));
1245
                    sender_window -= to_send;
1246
                    now += Duration::from_secs_f64(to_send as f64 * 8.0 / bandwidth as f64);
1247
                    true
1248
                } else {
1249
                    false
1250
                };
1251
1252
                // Receiver receives data frames.
1253
                let mut receiver_progressed = false;
1254
                if send_to_recv.front().is_some_and(|(at, _)| *at <= now) {
1255
                    let (_, data) = send_to_recv.pop_front().unwrap();
1256
                    let consumed = fc.set_consumed(fc.retired() + data)?;
1257
                    fc.add_retired(consumed);
1258
1259
                    // Receiver sends window updates.
1260
                    let prev_max_allowed = fc.max_allowed;
1261
                    if write_frames(&mut fc, rtt, now) == 1 {
1262
                        recv_to_send.push_front((now, fc.max_allowed - prev_max_allowed));
1263
                        receiver_progressed = true;
1264
                        if last_max_active < fc.max_active() {
1265
                            last_max_active = fc.max_active();
1266
                            last_max_active_changed = now;
1267
                        }
1268
                    }
1269
                }
1270
1271
                // When idle, travel in (simulated) time.
1272
                if !sender_progressed && !receiver_progressed {
1273
                    now = [recv_to_send.front(), send_to_recv.front()]
1274
                        .into_iter()
1275
                        .flatten()
1276
                        .map(|(at, _)| *at)
1277
                        .min()
1278
                        .expect("both are None");
1279
                }
1280
1281
                // Consider auto-tuning done once receive window hasn't changed for 4 RTT.
1282
                if now.duration_since(last_max_active_changed) > 4 * rtt {
1283
                    break;
1284
                }
1285
            }
1286
1287
            let summary = format!(
1288
                "Got receive window of {} MiB on connection with bandwidth {} MBit/s ({bandwidth} Bit/s), delay {rtt:?}, bdp {} MiB.",
1289
                fc.max_active() / 1024 / 1024,
1290
                bandwidth / 1_000 / 1_000,
1291
                bdp / 1024 / 1024,
1292
            );
1293
1294
            assert!(
1295
                fc.max_active() + TOLERANCE >= bdp || fc.max_active() == MAX_RECV_WINDOW_SIZE,
1296
                "{summary} Receive window is smaller than the bdp."
1297
            );
1298
            assert!(
1299
                fc.max_active - TOLERANCE <= bdp
1300
                    || fc.max_active == INITIAL_STREAM_RECV_WINDOW_SIZE as u64,
1301
                "{summary} Receive window is larger than the bdp."
1302
            );
1303
1304
            qdebug!("{summary}");
1305
        }
1306
1307
        Ok(())
1308
    }
1309
1310
    #[test]
1311
    fn connection_flow_control_initial_window() {
1312
        // Connection flow control starts with a reasonable initial window
1313
        // (16x the stream window to accommodate 16 concurrent streams).
1314
        let max_data = ConnectionParameters::default().get_max_data();
1315
        assert_eq!(max_data, (INITIAL_STREAM_RECV_WINDOW_SIZE * 16) as u64);
1316
    }
1317
1318
    #[test]
1319
    fn connection_flow_control_auto_tune() -> Res<()> {
1320
        let rtt = Duration::from_millis(40);
1321
        let now = test_fixture::now();
1322
        let initial_window = (INITIAL_STREAM_RECV_WINDOW_SIZE * 16) as u64;
1323
        let mut fc = ReceiverFlowControl::new((), initial_window);
1324
        let initial_max_active = fc.max_active();
1325
1326
        // Helper to write frames
1327
        let write_conn_frames = |fc: &mut ReceiverFlowControl<()>, now: Instant| {
1328
            let mut builder =
1329
                packet::Builder::short(Encoder::new(), false, None::<&[u8]>, PACKET_LIMIT);
1330
            let mut tokens = recovery::Tokens::new();
1331
            fc.write_frames(
1332
                &mut builder,
1333
                &mut tokens,
1334
                &mut FrameStats::default(),
1335
                now,
1336
                rtt,
1337
            );
1338
            tokens.len()
1339
        };
1340
1341
        // Consume and retire multiple windows to trigger auto-tuning.
1342
        // Each iteration: consume a full window, retire it, send update.
1343
        for _ in 1..11 {
1344
            let to_consume = fc.max_active();
1345
            fc.consume(to_consume)?;
1346
            fc.add_retired(to_consume);
1347
            write_conn_frames(&mut fc, now);
1348
        }
1349
        let increased_max_active = fc.max_active();
1350
1351
        assert!(
1352
            initial_max_active < increased_max_active,
1353
            "expect connection-level receive window auto-tuning to increase max_active on full utilization"
1354
        );
1355
1356
        Ok(())
1357
    }
1358
1359
    #[test]
1360
    fn connection_flow_control_respects_max_window() -> Res<()> {
1361
        let rtt = Duration::from_millis(40);
1362
        let now = test_fixture::now();
1363
        let initial_window = (INITIAL_STREAM_RECV_WINDOW_SIZE * 16) as u64;
1364
        let mut fc = ReceiverFlowControl::new((), initial_window);
1365
1366
        // Helper to write frames
1367
        let write_conn_frames = |fc: &mut ReceiverFlowControl<()>| {
1368
            let mut builder =
1369
                packet::Builder::short(Encoder::new(), false, None::<&[u8]>, PACKET_LIMIT);
1370
            let mut tokens = recovery::Tokens::new();
1371
            fc.write_frames(
1372
                &mut builder,
1373
                &mut tokens,
1374
                &mut FrameStats::default(),
1375
                now,
1376
                rtt,
1377
            );
1378
            tokens.len()
1379
        };
1380
1381
        // Consume and retire many full windows to push window to the limit.
1382
        // Keep consuming without advancing time to create maximum pressure.
1383
        for _ in 0..1000 {
1384
            let prev_max = fc.max_active();
1385
            let to_consume = fc.max_active();
1386
            fc.consume(to_consume)?;
1387
            fc.add_retired(to_consume);
1388
            write_conn_frames(&mut fc);
1389
1390
            // Stop if we've reached the maximum and it's not growing anymore
1391
            if fc.max_active() == MAX_CONN_RECV_WINDOW_SIZE && fc.max_active() == prev_max {
1392
                qdebug!(
1393
                    "Reached and stabilized at max window: {} MiB",
1394
                    fc.max_active() / 1024 / 1024
1395
                );
1396
                break;
1397
            }
1398
        }
1399
1400
        assert_eq!(
1401
            fc.max_active(),
1402
            MAX_CONN_RECV_WINDOW_SIZE,
1403
            "expect connection-level receive window to cap at MAX_CONN_RECV_WINDOW_SIZE (100 MiB), got {} MiB",
1404
            fc.max_active() / 1024 / 1024
1405
        );
1406
1407
        qdebug!(
1408
            "Connection flow control window reached max: {} MiB",
1409
            fc.max_active() / 1024 / 1024
1410
        );
1411
1412
        Ok(())
1413
    }
1414
}