Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/proto/streams/flow_control.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::frame::Reason;
2
use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
3
4
use std::fmt;
5
6
// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
7
// aggregate them when the changes are significant. Many implementations do
8
// this by keeping a "ratio" of the update version the allowed window size.
9
//
10
// While some may wish to represent this ratio as percentage, using a f32,
11
// we skip having to deal with float math and stick to integers. To do so,
12
// the "ratio" is represented by 2 i32s, split into the numerator and
13
// denominator. For example, a 50% ratio is simply represented as 1/2.
14
//
15
// An example applying this ratio: If a stream has an allowed window size of
16
// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
17
// becomes greater than 1/2, or 50 bytes.
18
const UNCLAIMED_NUMERATOR: i32 = 1;
19
const UNCLAIMED_DENOMINATOR: i32 = 2;
20
21
#[test]
22
#[allow(clippy::assertions_on_constants)]
23
fn sanity_unclaimed_ratio() {
24
    assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
25
    assert!(UNCLAIMED_NUMERATOR >= 0);
26
    assert!(UNCLAIMED_DENOMINATOR > 0);
27
}
28
29
#[derive(Copy, Clone, Debug)]
30
pub struct FlowControl {
31
    /// Window the peer knows about.
32
    ///
33
    /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
34
    ///
35
    /// For example, say the peer sends a request and uses 32kb of the window.
36
    /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
37
    /// its understanding of the capacity of the window, and that would be:
38
    ///
39
    /// ```notrust
40
    /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
41
    /// ```
42
    window_size: Window,
43
44
    /// Window that we know about.
45
    ///
46
    /// This can go negative if a user declares a smaller target window than
47
    /// the peer knows about.
48
    available: Window,
49
}
50
51
impl FlowControl {
52
927k
    pub fn new() -> FlowControl {
53
927k
        FlowControl {
54
927k
            window_size: Window(0),
55
927k
            available: Window(0),
56
927k
        }
57
927k
    }
58
59
    /// Returns the window size as known by the peer
60
622k
    pub fn window_size(&self) -> WindowSize {
61
622k
        self.window_size.as_size()
62
622k
    }
63
64
    /// Returns the window size available to the consumer
65
3.29M
    pub fn available(&self) -> Window {
66
3.29M
        self.available
67
3.29M
    }
68
69
    /// Returns true if there is unavailable window capacity
70
331k
    pub fn has_unavailable(&self) -> bool {
71
331k
        if self.window_size < 0 {
72
0
            return false;
73
331k
        }
74
331k
75
331k
        self.window_size > self.available
76
331k
    }
77
78
148k
    pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
79
148k
        self.available.decrease_by(capacity)
80
148k
    }
81
82
667k
    pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
83
667k
        self.available.increase_by(capacity)
84
667k
    }
85
86
    /// If a WINDOW_UPDATE frame should be sent, returns a positive number
87
    /// representing the increment to be used.
88
    ///
89
    /// If there is no available bytes to be reclaimed, or the number of
90
    /// available bytes does not reach the threshold, this returns `None`.
91
    ///
92
    /// This represents pending outbound WINDOW_UPDATE frames.
93
1.64M
    pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
94
1.64M
        let available = self.available;
95
1.64M
96
1.64M
        if self.window_size >= available {
97
1.54M
            return None;
98
102k
        }
99
102k
100
102k
        let unclaimed = available.0 - self.window_size.0;
101
102k
        let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
102
102k
103
102k
        if unclaimed < threshold {
104
100k
            None
105
        } else {
106
1.71k
            Some(unclaimed as WindowSize)
107
        }
108
1.64M
    }
109
110
    /// Increase the window size.
111
    ///
112
    /// This is called after receiving a WINDOW_UPDATE frame
113
1.05M
    pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
114
1.05M
        let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
115
1.05M
116
1.05M
        if overflow {
117
32
            return Err(Reason::FLOW_CONTROL_ERROR);
118
1.05M
        }
119
1.05M
120
1.05M
        if val > MAX_WINDOW_SIZE as i32 {
121
0
            return Err(Reason::FLOW_CONTROL_ERROR);
122
1.05M
        }
123
1.05M
124
1.05M
        tracing::trace!(
125
0
            "inc_window; sz={}; old={}; new={}",
126
            sz,
127
            self.window_size,
128
            val
129
        );
130
131
1.05M
        self.window_size = Window(val);
132
1.05M
        Ok(())
133
1.05M
    }
134
135
    /// Decrement the send-side window size.
136
    ///
137
    /// This is called after receiving a SETTINGS frame with a lower
138
    /// INITIAL_WINDOW_SIZE value.
139
168k
    pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
140
168k
        tracing::trace!(
141
0
            "dec_window; sz={}; window={}, available={}",
142
            sz,
143
            self.window_size,
144
            self.available
145
        );
146
        // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
147
168k
        self.window_size.decrease_by(sz)?;
148
168k
        Ok(())
149
168k
    }
150
151
    /// Decrement the recv-side window size.
152
    ///
153
    /// This is called after receiving a SETTINGS ACK frame with a lower
154
    /// INITIAL_WINDOW_SIZE value.
155
0
    pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
156
0
        tracing::trace!(
157
0
            "dec_recv_window; sz={}; window={}, available={}",
158
            sz,
159
            self.window_size,
160
            self.available
161
        );
162
        // This should not be able to overflow `window_size` from the bottom.
163
0
        self.window_size.decrease_by(sz)?;
164
0
        self.available.decrease_by(sz)?;
165
0
        Ok(())
166
0
    }
167
168
    /// Decrements the window reflecting data has actually been sent. The caller
169
    /// must ensure that the window has capacity.
170
78.3k
    pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
171
78.3k
        tracing::trace!(
172
0
            "send_data; sz={}; window={}; available={}",
173
            sz,
174
            self.window_size,
175
            self.available
176
        );
177
178
        // If send size is zero it's meaningless to update flow control window
179
78.3k
        if sz > 0 {
180
            // Ensure that the argument is correct
181
43.9k
            assert!(self.window_size.0 >= sz as i32);
182
183
            // Update values
184
43.9k
            self.window_size.decrease_by(sz)?;
185
43.9k
            self.available.decrease_by(sz)?;
186
34.3k
        }
187
78.3k
        Ok(())
188
78.3k
    }
189
}
190
191
/// The current capacity of a flow-controlled Window.
192
///
193
/// This number can go negative when either side has used a certain amount
194
/// of capacity when the other side advertises a reduction in size.
195
///
196
/// This type tries to centralize the knowledge of addition and subtraction
197
/// to this capacity, instead of having integer casts throughout the source.
198
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
199
pub struct Window(i32);
200
201
impl Window {
202
2.82M
    pub fn as_size(&self) -> WindowSize {
203
2.82M
        if self.0 < 0 {
204
565
            0
205
        } else {
206
2.81M
            self.0 as WindowSize
207
        }
208
2.82M
    }
209
210
0
    pub fn checked_size(&self) -> WindowSize {
211
0
        assert!(self.0 >= 0, "negative Window");
212
0
        self.0 as WindowSize
213
0
    }
214
215
404k
    pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
216
404k
        if let Some(v) = self.0.checked_sub(other as i32) {
217
404k
            self.0 = v;
218
404k
            Ok(())
219
        } else {
220
0
            Err(Reason::FLOW_CONTROL_ERROR)
221
        }
222
404k
    }
223
224
667k
    pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
225
667k
        let other = self.add(other)?;
226
667k
        self.0 = other.0;
227
667k
        Ok(())
228
667k
    }
229
230
667k
    pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
231
667k
        if let Some(v) = self.0.checked_add(other as i32) {
232
667k
            Ok(Self(v))
233
        } else {
234
0
            Err(Reason::FLOW_CONTROL_ERROR)
235
        }
236
667k
    }
237
}
238
239
impl PartialEq<usize> for Window {
240
46.5k
    fn eq(&self, other: &usize) -> bool {
241
46.5k
        if self.0 < 0 {
242
0
            false
243
        } else {
244
46.5k
            (self.0 as usize).eq(other)
245
        }
246
46.5k
    }
247
}
248
249
impl PartialOrd<usize> for Window {
250
1.40M
    fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
251
1.40M
        if self.0 < 0 {
252
0
            Some(::std::cmp::Ordering::Less)
253
        } else {
254
1.40M
            (self.0 as usize).partial_cmp(other)
255
        }
256
1.40M
    }
257
}
258
259
impl fmt::Display for Window {
260
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261
0
        fmt::Display::fmt(&self.0, f)
262
0
    }
263
}
264
265
impl From<Window> for isize {
266
0
    fn from(w: Window) -> isize {
267
0
        w.0 as isize
268
0
    }
269
}