/src/h2/src/proto/streams/flow_control.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use crate::frame::Reason; |
2 | | use crate::proto::{WindowSize, MAX_WINDOW_SIZE}; |
3 | | |
4 | | use std::fmt; |
5 | | |
6 | | // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead |
7 | | // aggregate them when the changes are significant. Many implementations do |
8 | | // this by keeping a "ratio" of the update version the allowed window size. |
9 | | // |
10 | | // While some may wish to represent this ratio as percentage, using a f32, |
11 | | // we skip having to deal with float math and stick to integers. To do so, |
12 | | // the "ratio" is represented by 2 i32s, split into the numerator and |
13 | | // denominator. For example, a 50% ratio is simply represented as 1/2. |
14 | | // |
15 | | // An example applying this ratio: If a stream has an allowed window size of |
16 | | // 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change |
17 | | // becomes greater than 1/2, or 50 bytes. |
18 | | const UNCLAIMED_NUMERATOR: i32 = 1; |
19 | | const UNCLAIMED_DENOMINATOR: i32 = 2; |
20 | | |
21 | | #[test] |
22 | | #[allow(clippy::assertions_on_constants)] |
23 | | fn sanity_unclaimed_ratio() { |
24 | | assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR); |
25 | | assert!(UNCLAIMED_NUMERATOR >= 0); |
26 | | assert!(UNCLAIMED_DENOMINATOR > 0); |
27 | | } |
28 | | |
29 | | #[derive(Copy, Clone, Debug)] |
30 | | pub struct FlowControl { |
31 | | /// Window the peer knows about. |
32 | | /// |
33 | | /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. |
34 | | /// |
35 | | /// For example, say the peer sends a request and uses 32kb of the window. |
36 | | /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust |
37 | | /// its understanding of the capacity of the window, and that would be: |
38 | | /// |
39 | | /// ```notrust |
40 | | /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb |
41 | | /// ``` |
42 | | window_size: Window, |
43 | | |
44 | | /// Window that we know about. |
45 | | /// |
46 | | /// This can go negative if a user declares a smaller target window than |
47 | | /// the peer knows about. |
48 | | available: Window, |
49 | | } |
50 | | |
51 | | impl FlowControl { |
52 | 927k | pub fn new() -> FlowControl { |
53 | 927k | FlowControl { |
54 | 927k | window_size: Window(0), |
55 | 927k | available: Window(0), |
56 | 927k | } |
57 | 927k | } |
58 | | |
59 | | /// Returns the window size as known by the peer |
60 | 622k | pub fn window_size(&self) -> WindowSize { |
61 | 622k | self.window_size.as_size() |
62 | 622k | } |
63 | | |
64 | | /// Returns the window size available to the consumer |
65 | 3.29M | pub fn available(&self) -> Window { |
66 | 3.29M | self.available |
67 | 3.29M | } |
68 | | |
69 | | /// Returns true if there is unavailable window capacity |
70 | 331k | pub fn has_unavailable(&self) -> bool { |
71 | 331k | if self.window_size < 0 { |
72 | 0 | return false; |
73 | 331k | } |
74 | 331k | |
75 | 331k | self.window_size > self.available |
76 | 331k | } |
77 | | |
78 | 148k | pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { |
79 | 148k | self.available.decrease_by(capacity) |
80 | 148k | } |
81 | | |
82 | 667k | pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { |
83 | 667k | self.available.increase_by(capacity) |
84 | 667k | } |
85 | | |
86 | | /// If a WINDOW_UPDATE frame should be sent, returns a positive number |
87 | | /// representing the increment to be used. |
88 | | /// |
89 | | /// If there is no available bytes to be reclaimed, or the number of |
90 | | /// available bytes does not reach the threshold, this returns `None`. |
91 | | /// |
92 | | /// This represents pending outbound WINDOW_UPDATE frames. |
93 | 1.64M | pub fn unclaimed_capacity(&self) -> Option<WindowSize> { |
94 | 1.64M | let available = self.available; |
95 | 1.64M | |
96 | 1.64M | if self.window_size >= available { |
97 | 1.54M | return None; |
98 | 102k | } |
99 | 102k | |
100 | 102k | let unclaimed = available.0 - self.window_size.0; |
101 | 102k | let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; |
102 | 102k | |
103 | 102k | if unclaimed < threshold { |
104 | 100k | None |
105 | | } else { |
106 | 1.71k | Some(unclaimed as WindowSize) |
107 | | } |
108 | 1.64M | } |
109 | | |
110 | | /// Increase the window size. |
111 | | /// |
112 | | /// This is called after receiving a WINDOW_UPDATE frame |
113 | 1.05M | pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
114 | 1.05M | let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); |
115 | 1.05M | |
116 | 1.05M | if overflow { |
117 | 32 | return Err(Reason::FLOW_CONTROL_ERROR); |
118 | 1.05M | } |
119 | 1.05M | |
120 | 1.05M | if val > MAX_WINDOW_SIZE as i32 { |
121 | 0 | return Err(Reason::FLOW_CONTROL_ERROR); |
122 | 1.05M | } |
123 | 1.05M | |
124 | 1.05M | tracing::trace!( |
125 | 0 | "inc_window; sz={}; old={}; new={}", |
126 | | sz, |
127 | | self.window_size, |
128 | | val |
129 | | ); |
130 | | |
131 | 1.05M | self.window_size = Window(val); |
132 | 1.05M | Ok(()) |
133 | 1.05M | } |
134 | | |
135 | | /// Decrement the send-side window size. |
136 | | /// |
137 | | /// This is called after receiving a SETTINGS frame with a lower |
138 | | /// INITIAL_WINDOW_SIZE value. |
139 | 168k | pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
140 | 168k | tracing::trace!( |
141 | 0 | "dec_window; sz={}; window={}, available={}", |
142 | | sz, |
143 | | self.window_size, |
144 | | self.available |
145 | | ); |
146 | | // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can. |
147 | 168k | self.window_size.decrease_by(sz)?; |
148 | 168k | Ok(()) |
149 | 168k | } |
150 | | |
151 | | /// Decrement the recv-side window size. |
152 | | /// |
153 | | /// This is called after receiving a SETTINGS ACK frame with a lower |
154 | | /// INITIAL_WINDOW_SIZE value. |
155 | 0 | pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
156 | 0 | tracing::trace!( |
157 | 0 | "dec_recv_window; sz={}; window={}, available={}", |
158 | | sz, |
159 | | self.window_size, |
160 | | self.available |
161 | | ); |
162 | | // This should not be able to overflow `window_size` from the bottom. |
163 | 0 | self.window_size.decrease_by(sz)?; |
164 | 0 | self.available.decrease_by(sz)?; |
165 | 0 | Ok(()) |
166 | 0 | } |
167 | | |
168 | | /// Decrements the window reflecting data has actually been sent. The caller |
169 | | /// must ensure that the window has capacity. |
170 | 78.3k | pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> { |
171 | 78.3k | tracing::trace!( |
172 | 0 | "send_data; sz={}; window={}; available={}", |
173 | | sz, |
174 | | self.window_size, |
175 | | self.available |
176 | | ); |
177 | | |
178 | | // If send size is zero it's meaningless to update flow control window |
179 | 78.3k | if sz > 0 { |
180 | | // Ensure that the argument is correct |
181 | 43.9k | assert!(self.window_size.0 >= sz as i32); |
182 | | |
183 | | // Update values |
184 | 43.9k | self.window_size.decrease_by(sz)?; |
185 | 43.9k | self.available.decrease_by(sz)?; |
186 | 34.3k | } |
187 | 78.3k | Ok(()) |
188 | 78.3k | } |
189 | | } |
190 | | |
191 | | /// The current capacity of a flow-controlled Window. |
192 | | /// |
193 | | /// This number can go negative when either side has used a certain amount |
194 | | /// of capacity when the other side advertises a reduction in size. |
195 | | /// |
196 | | /// This type tries to centralize the knowledge of addition and subtraction |
197 | | /// to this capacity, instead of having integer casts throughout the source. |
198 | | #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)] |
199 | | pub struct Window(i32); |
200 | | |
201 | | impl Window { |
202 | 2.82M | pub fn as_size(&self) -> WindowSize { |
203 | 2.82M | if self.0 < 0 { |
204 | 565 | 0 |
205 | | } else { |
206 | 2.81M | self.0 as WindowSize |
207 | | } |
208 | 2.82M | } |
209 | | |
210 | 0 | pub fn checked_size(&self) -> WindowSize { |
211 | 0 | assert!(self.0 >= 0, "negative Window"); |
212 | 0 | self.0 as WindowSize |
213 | 0 | } |
214 | | |
215 | 404k | pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> { |
216 | 404k | if let Some(v) = self.0.checked_sub(other as i32) { |
217 | 404k | self.0 = v; |
218 | 404k | Ok(()) |
219 | | } else { |
220 | 0 | Err(Reason::FLOW_CONTROL_ERROR) |
221 | | } |
222 | 404k | } |
223 | | |
224 | 667k | pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> { |
225 | 667k | let other = self.add(other)?; |
226 | 667k | self.0 = other.0; |
227 | 667k | Ok(()) |
228 | 667k | } |
229 | | |
230 | 667k | pub fn add(&self, other: WindowSize) -> Result<Self, Reason> { |
231 | 667k | if let Some(v) = self.0.checked_add(other as i32) { |
232 | 667k | Ok(Self(v)) |
233 | | } else { |
234 | 0 | Err(Reason::FLOW_CONTROL_ERROR) |
235 | | } |
236 | 667k | } |
237 | | } |
238 | | |
239 | | impl PartialEq<usize> for Window { |
240 | 46.5k | fn eq(&self, other: &usize) -> bool { |
241 | 46.5k | if self.0 < 0 { |
242 | 0 | false |
243 | | } else { |
244 | 46.5k | (self.0 as usize).eq(other) |
245 | | } |
246 | 46.5k | } |
247 | | } |
248 | | |
249 | | impl PartialOrd<usize> for Window { |
250 | 1.40M | fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> { |
251 | 1.40M | if self.0 < 0 { |
252 | 0 | Some(::std::cmp::Ordering::Less) |
253 | | } else { |
254 | 1.40M | (self.0 as usize).partial_cmp(other) |
255 | | } |
256 | 1.40M | } |
257 | | } |
258 | | |
259 | | impl fmt::Display for Window { |
260 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
261 | 0 | fmt::Display::fmt(&self.0, f) |
262 | 0 | } |
263 | | } |
264 | | |
265 | | impl From<Window> for isize { |
266 | 0 | fn from(w: Window) -> isize { |
267 | 0 | w.0 as isize |
268 | 0 | } |
269 | | } |