/src/h2/src/proto/streams/stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use crate::Reason; |
2 | | |
3 | | use super::*; |
4 | | |
5 | | use std::fmt; |
6 | | use std::task::{Context, Waker}; |
7 | | use std::time::Instant; |
8 | | |
9 | | /// Tracks Stream related state |
10 | | /// |
11 | | /// # Reference counting |
12 | | /// |
13 | | /// There can be a number of outstanding handles to a single Stream. These are |
14 | | /// tracked using reference counting. The `ref_count` field represents the |
15 | | /// number of outstanding userspace handles that can reach this stream. |
16 | | /// |
17 | | /// It's important to note that when the stream is placed in an internal queue |
18 | | /// (such as an accept queue), this is **not** tracked by a reference count. |
19 | | /// Thus, `ref_count` can be zero and the stream still has to be kept around. |
20 | | pub(super) struct Stream { |
21 | | /// The h2 stream identifier |
22 | | pub id: StreamId, |
23 | | |
24 | | /// Current state of the stream |
25 | | pub state: State, |
26 | | |
27 | | /// Set to `true` when the stream is counted against the connection's max |
28 | | /// concurrent streams. |
29 | | pub is_counted: bool, |
30 | | |
31 | | /// Number of outstanding handles pointing to this stream |
32 | | pub ref_count: usize, |
33 | | |
34 | | // ===== Fields related to sending ===== |
35 | | /// Next node in the accept linked list |
36 | | pub next_pending_send: Option<store::Key>, |
37 | | |
38 | | /// Set to true when the stream is pending accept |
39 | | pub is_pending_send: bool, |
40 | | |
41 | | /// Send data flow control |
42 | | pub send_flow: FlowControl, |
43 | | |
44 | | /// Amount of send capacity that has been requested, but not yet allocated. |
45 | | pub requested_send_capacity: WindowSize, |
46 | | |
47 | | /// Amount of data buffered at the prioritization layer. |
48 | | /// TODO: Technically this could be greater than the window size... |
49 | | pub buffered_send_data: usize, |
50 | | |
51 | | /// Task tracking additional send capacity (i.e. window updates). |
52 | | send_task: Option<Waker>, |
53 | | |
54 | | /// Frames pending for this stream being sent to the socket |
55 | | pub pending_send: buffer::Deque, |
56 | | |
57 | | /// Next node in the linked list of streams waiting for additional |
58 | | /// connection level capacity. |
59 | | pub next_pending_send_capacity: Option<store::Key>, |
60 | | |
61 | | /// True if the stream is waiting for outbound connection capacity |
62 | | pub is_pending_send_capacity: bool, |
63 | | |
64 | | /// Set to true when the send capacity has been incremented |
65 | | pub send_capacity_inc: bool, |
66 | | |
67 | | /// Next node in the open linked list |
68 | | pub next_open: Option<store::Key>, |
69 | | |
70 | | /// Set to true when the stream is pending to be opened |
71 | | pub is_pending_open: bool, |
72 | | |
73 | | /// Set to true when a push is pending for this stream |
74 | | pub is_pending_push: bool, |
75 | | |
76 | | // ===== Fields related to receiving ===== |
77 | | /// Next node in the accept linked list |
78 | | pub next_pending_accept: Option<store::Key>, |
79 | | |
80 | | /// Set to true when the stream is pending accept |
81 | | pub is_pending_accept: bool, |
82 | | |
83 | | /// Receive data flow control |
84 | | pub recv_flow: FlowControl, |
85 | | |
86 | | pub in_flight_recv_data: WindowSize, |
87 | | |
88 | | /// Next node in the linked list of streams waiting to send window updates. |
89 | | pub next_window_update: Option<store::Key>, |
90 | | |
91 | | /// True if the stream is waiting to send a window update |
92 | | pub is_pending_window_update: bool, |
93 | | |
94 | | /// The time when this stream may have been locally reset. |
95 | | pub reset_at: Option<Instant>, |
96 | | |
97 | | /// Next node in list of reset streams that should expire eventually |
98 | | pub next_reset_expire: Option<store::Key>, |
99 | | |
100 | | /// Frames pending for this stream to read |
101 | | pub pending_recv: buffer::Deque, |
102 | | |
103 | | /// When the RecvStream drop occurs, no data should be received. |
104 | | pub is_recv: bool, |
105 | | |
106 | | /// Task tracking receiving frames |
107 | | pub recv_task: Option<Waker>, |
108 | | |
109 | | /// Task tracking pushed promises. |
110 | | pub push_task: Option<Waker>, |
111 | | |
112 | | /// The stream's pending push promises |
113 | | pub pending_push_promises: store::Queue<NextAccept>, |
114 | | |
115 | | /// Validate content-length headers |
116 | | pub content_length: ContentLength, |
117 | | } |
118 | | |
119 | | /// State related to validating a stream's content-length |
120 | | #[derive(Debug)] |
121 | | pub enum ContentLength { |
122 | | Omitted, |
123 | | Head, |
124 | | Remaining(u64), |
125 | | } |
126 | | |
127 | | #[derive(Debug)] |
128 | | pub(super) struct NextAccept; |
129 | | |
130 | | #[derive(Debug)] |
131 | | pub(super) struct NextSend; |
132 | | |
133 | | #[derive(Debug)] |
134 | | pub(super) struct NextSendCapacity; |
135 | | |
136 | | #[derive(Debug)] |
137 | | pub(super) struct NextWindowUpdate; |
138 | | |
139 | | #[derive(Debug)] |
140 | | pub(super) struct NextOpen; |
141 | | |
142 | | #[derive(Debug)] |
143 | | pub(super) struct NextResetExpire; |
144 | | |
145 | | impl Stream { |
146 | 476k | pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { |
147 | 476k | let mut send_flow = FlowControl::new(); |
148 | 476k | let mut recv_flow = FlowControl::new(); |
149 | 476k | |
150 | 476k | recv_flow |
151 | 476k | .inc_window(init_recv_window) |
152 | 476k | .expect("invalid initial receive window"); |
153 | 476k | // TODO: proper error handling? |
154 | 476k | let _res = recv_flow.assign_capacity(init_recv_window); |
155 | 476k | debug_assert!(_res.is_ok()); |
156 | | |
157 | 476k | send_flow |
158 | 476k | .inc_window(init_send_window) |
159 | 476k | .expect("invalid initial send window size"); |
160 | 476k | |
161 | 476k | Stream { |
162 | 476k | id, |
163 | 476k | state: State::default(), |
164 | 476k | ref_count: 0, |
165 | 476k | is_counted: false, |
166 | 476k | |
167 | 476k | // ===== Fields related to sending ===== |
168 | 476k | next_pending_send: None, |
169 | 476k | is_pending_send: false, |
170 | 476k | send_flow, |
171 | 476k | requested_send_capacity: 0, |
172 | 476k | buffered_send_data: 0, |
173 | 476k | send_task: None, |
174 | 476k | pending_send: buffer::Deque::new(), |
175 | 476k | is_pending_send_capacity: false, |
176 | 476k | next_pending_send_capacity: None, |
177 | 476k | send_capacity_inc: false, |
178 | 476k | is_pending_open: false, |
179 | 476k | next_open: None, |
180 | 476k | is_pending_push: false, |
181 | 476k | |
182 | 476k | // ===== Fields related to receiving ===== |
183 | 476k | next_pending_accept: None, |
184 | 476k | is_pending_accept: false, |
185 | 476k | recv_flow, |
186 | 476k | in_flight_recv_data: 0, |
187 | 476k | next_window_update: None, |
188 | 476k | is_pending_window_update: false, |
189 | 476k | reset_at: None, |
190 | 476k | next_reset_expire: None, |
191 | 476k | pending_recv: buffer::Deque::new(), |
192 | 476k | is_recv: true, |
193 | 476k | recv_task: None, |
194 | 476k | push_task: None, |
195 | 476k | pending_push_promises: store::Queue::new(), |
196 | 476k | content_length: ContentLength::Omitted, |
197 | 476k | } |
198 | 476k | } |
199 | | |
200 | | /// Increment the stream's ref count |
201 | 829k | pub fn ref_inc(&mut self) { |
202 | 829k | assert!(self.ref_count < usize::MAX); |
203 | 829k | self.ref_count += 1; |
204 | 829k | } |
205 | | |
206 | | /// Decrements the stream's ref count |
207 | 829k | pub fn ref_dec(&mut self) { |
208 | 829k | assert!(self.ref_count > 0); |
209 | 829k | self.ref_count -= 1; |
210 | 829k | } |
211 | | |
212 | | /// Returns true if stream is currently being held for some time because of |
213 | | /// a local reset. |
214 | 4.12M | pub fn is_pending_reset_expiration(&self) -> bool { |
215 | 4.12M | self.reset_at.is_some() |
216 | 4.12M | } |
217 | | |
218 | | /// Returns true if frames for this stream are ready to be sent over the wire |
219 | 869k | pub fn is_send_ready(&self) -> bool { |
220 | 869k | // Why do we check pending_open? |
221 | 869k | // |
222 | 869k | // We allow users to call send_request() which schedules a stream to be pending_open |
223 | 869k | // if there is no room according to the concurrency limit (max_send_streams), and we |
224 | 869k | // also allow data to be buffered for send with send_data() if there is no capacity for |
225 | 869k | // the stream to send the data, which attempts to place the stream in pending_send. |
226 | 869k | // If the stream is not open, we don't want the stream to be scheduled for |
227 | 869k | // execution (pending_send). Note that if the stream is in pending_open, it will be |
228 | 869k | // pushed to pending_send when there is room for an open stream. |
229 | 869k | // |
230 | 869k | // In pending_push we track whether a PushPromise still needs to be sent |
231 | 869k | // from a different stream before we can start sending frames on this one. |
232 | 869k | // This is different from the "open" check because reserved streams don't count |
233 | 869k | // toward the concurrency limit. |
234 | 869k | // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 |
235 | 869k | !self.is_pending_open && !self.is_pending_push |
236 | 869k | } |
237 | | |
238 | | /// Returns true if the stream is closed |
239 | 5.62M | pub fn is_closed(&self) -> bool { |
240 | 5.62M | // The state has fully transitioned to closed. |
241 | 5.62M | self.state.is_closed() && |
242 | | // Because outbound frames transition the stream state before being |
243 | | // buffered, we have to ensure that all frames have been flushed. |
244 | 3.37M | self.pending_send.is_empty() && |
245 | | // Sometimes large data frames are sent out in chunks. After a chunk |
246 | | // of the frame is sent, the remainder is pushed back onto the send |
247 | | // queue to be rescheduled. |
248 | | // |
249 | | // Checking for additional buffered data lets us catch this case. |
250 | 3.03M | self.buffered_send_data == 0 |
251 | 5.62M | } |
252 | | |
253 | | /// Returns true if the stream is no longer in use |
254 | 2.60M | pub fn is_released(&self) -> bool { |
255 | 2.60M | // The stream is closed and fully flushed |
256 | 2.60M | self.is_closed() && |
257 | | // There are no more outstanding references to the stream |
258 | 1.35M | self.ref_count == 0 && |
259 | | // The stream is not in any queue |
260 | 871k | !self.is_pending_send && !self.is_pending_send_capacity && |
261 | 610k | !self.is_pending_accept && !self.is_pending_window_update && |
262 | 609k | !self.is_pending_open && self.reset_at.is_none() |
263 | 2.60M | } |
264 | | |
265 | | /// Returns true when the consumer of the stream has dropped all handles |
266 | | /// (indicating no further interest in the stream) and the stream state is |
267 | | /// not actually closed. |
268 | | /// |
269 | | /// In this case, a reset should be sent. |
270 | 829k | pub fn is_canceled_interest(&self) -> bool { |
271 | 829k | self.ref_count == 0 && !self.state.is_closed() |
272 | 829k | } |
273 | | |
274 | | /// Current available stream send capacity |
275 | 162k | pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { |
276 | 162k | let available = self.send_flow.available().as_size() as usize; |
277 | 162k | let buffered = self.buffered_send_data; |
278 | 162k | |
279 | 162k | available.min(max_buffer_size).saturating_sub(buffered) as WindowSize |
280 | 162k | } |
281 | | |
282 | 62.8k | pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { |
283 | 62.8k | let prev_capacity = self.capacity(max_buffer_size); |
284 | 62.8k | debug_assert!(capacity > 0); |
285 | | // TODO: proper error handling |
286 | 62.8k | let _res = self.send_flow.assign_capacity(capacity); |
287 | 62.8k | debug_assert!(_res.is_ok()); |
288 | | |
289 | 62.8k | tracing::trace!( |
290 | 0 | " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", |
291 | 0 | self.send_flow.available(), |
292 | | self.buffered_send_data, |
293 | | self.id, |
294 | | max_buffer_size, |
295 | | prev_capacity, |
296 | | ); |
297 | | |
298 | 62.8k | if prev_capacity < self.capacity(max_buffer_size) { |
299 | 0 | self.notify_capacity(); |
300 | 62.8k | } |
301 | 62.8k | } |
302 | | |
303 | 18.2k | pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { |
304 | 18.2k | let prev_capacity = self.capacity(max_buffer_size); |
305 | 18.2k | |
306 | 18.2k | // TODO: proper error handling |
307 | 18.2k | let _res = self.send_flow.send_data(len); |
308 | 18.2k | debug_assert!(_res.is_ok()); |
309 | | |
310 | | // Decrement the stream's buffered data counter |
311 | 18.2k | debug_assert!(self.buffered_send_data >= len as usize); |
312 | 18.2k | self.buffered_send_data -= len as usize; |
313 | 18.2k | self.requested_send_capacity -= len; |
314 | 18.2k | |
315 | 18.2k | tracing::trace!( |
316 | 0 | " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", |
317 | 0 | self.send_flow.available(), |
318 | | self.buffered_send_data, |
319 | | self.id, |
320 | | max_buffer_size, |
321 | | prev_capacity, |
322 | | ); |
323 | | |
324 | 18.2k | if prev_capacity < self.capacity(max_buffer_size) { |
325 | 0 | self.notify_capacity(); |
326 | 18.2k | } |
327 | 18.2k | } |
328 | | |
329 | | /// If the capacity was limited because of the max_send_buffer_size, |
330 | | /// then consider waking the send task again... |
331 | 0 | pub fn notify_capacity(&mut self) { |
332 | 0 | self.send_capacity_inc = true; |
333 | 0 | tracing::trace!(" notifying task"); |
334 | 0 | self.notify_send(); |
335 | 0 | } |
336 | | |
337 | | /// Returns `Err` when the decrement cannot be completed due to overflow. |
338 | 2.51k | pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { |
339 | 2.51k | match self.content_length { |
340 | 1.96k | ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { |
341 | 1.96k | Some(val) => *rem = val, |
342 | 1 | None => return Err(()), |
343 | | }, |
344 | | ContentLength::Head => { |
345 | 0 | if len != 0 { |
346 | 0 | return Err(()); |
347 | 0 | } |
348 | | } |
349 | 546 | _ => {} |
350 | | } |
351 | | |
352 | 2.51k | Ok(()) |
353 | 2.51k | } |
354 | | |
355 | 88 | pub fn ensure_content_length_zero(&self) -> Result<(), ()> { |
356 | 88 | match self.content_length { |
357 | 2 | ContentLength::Remaining(0) => Ok(()), |
358 | 26 | ContentLength::Remaining(_) => Err(()), |
359 | 60 | _ => Ok(()), |
360 | | } |
361 | 88 | } |
362 | | |
363 | 714k | pub fn notify_send(&mut self) { |
364 | 714k | if let Some(task) = self.send_task.take() { |
365 | 233 | task.wake(); |
366 | 714k | } |
367 | 714k | } |
368 | | |
369 | 11.5k | pub fn wait_send(&mut self, cx: &Context) { |
370 | 11.5k | self.send_task = Some(cx.waker().clone()); |
371 | 11.5k | } |
372 | | |
373 | 744k | pub fn notify_recv(&mut self) { |
374 | 744k | if let Some(task) = self.recv_task.take() { |
375 | 412k | task.wake(); |
376 | 412k | } |
377 | 744k | } |
378 | | |
379 | 663k | pub(super) fn notify_push(&mut self) { |
380 | 663k | if let Some(task) = self.push_task.take() { |
381 | 0 | task.wake(); |
382 | 663k | } |
383 | 663k | } |
384 | | |
385 | | /// Set the stream's state to `Closed` with the given reason and initiator. |
386 | | /// Notify the send and receive tasks, if they exist. |
387 | 134k | pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) { |
388 | 134k | self.state.set_reset(self.id, reason, initiator); |
389 | 134k | self.notify_push(); |
390 | 134k | self.notify_recv(); |
391 | 134k | } |
392 | | } |
393 | | |
394 | | impl fmt::Debug for Stream { |
395 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
396 | 0 | f.debug_struct("Stream") |
397 | 0 | .field("id", &self.id) |
398 | 0 | .field("state", &self.state) |
399 | 0 | .field("is_counted", &self.is_counted) |
400 | 0 | .field("ref_count", &self.ref_count) |
401 | 0 | .field("next_pending_send", &self.next_pending_send) |
402 | 0 | .field("is_pending_send", &self.is_pending_send) |
403 | 0 | .field("send_flow", &self.send_flow) |
404 | 0 | .field("requested_send_capacity", &self.requested_send_capacity) |
405 | 0 | .field("buffered_send_data", &self.buffered_send_data) |
406 | 0 | .field("send_task", &self.send_task.as_ref().map(|_| ())) |
407 | 0 | .field("pending_send", &self.pending_send) |
408 | 0 | .field( |
409 | 0 | "next_pending_send_capacity", |
410 | 0 | &self.next_pending_send_capacity, |
411 | 0 | ) |
412 | 0 | .field("is_pending_send_capacity", &self.is_pending_send_capacity) |
413 | 0 | .field("send_capacity_inc", &self.send_capacity_inc) |
414 | 0 | .field("next_open", &self.next_open) |
415 | 0 | .field("is_pending_open", &self.is_pending_open) |
416 | 0 | .field("is_pending_push", &self.is_pending_push) |
417 | 0 | .field("next_pending_accept", &self.next_pending_accept) |
418 | 0 | .field("is_pending_accept", &self.is_pending_accept) |
419 | 0 | .field("recv_flow", &self.recv_flow) |
420 | 0 | .field("in_flight_recv_data", &self.in_flight_recv_data) |
421 | 0 | .field("next_window_update", &self.next_window_update) |
422 | 0 | .field("is_pending_window_update", &self.is_pending_window_update) |
423 | 0 | .field("reset_at", &self.reset_at) |
424 | 0 | .field("next_reset_expire", &self.next_reset_expire) |
425 | 0 | .field("pending_recv", &self.pending_recv) |
426 | 0 | .field("is_recv", &self.is_recv) |
427 | 0 | .field("recv_task", &self.recv_task.as_ref().map(|_| ())) |
428 | 0 | .field("push_task", &self.push_task.as_ref().map(|_| ())) |
429 | 0 | .field("pending_push_promises", &self.pending_push_promises) |
430 | 0 | .field("content_length", &self.content_length) |
431 | 0 | .finish() |
432 | 0 | } |
433 | | } |
434 | | |
435 | | impl store::Next for NextAccept { |
436 | 150 | fn next(stream: &Stream) -> Option<store::Key> { |
437 | 150 | stream.next_pending_accept |
438 | 150 | } |
439 | | |
440 | 71 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
441 | 71 | stream.next_pending_accept = key; |
442 | 71 | } |
443 | | |
444 | 68 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
445 | 68 | stream.next_pending_accept.take() |
446 | 68 | } |
447 | | |
448 | 226 | fn is_queued(stream: &Stream) -> bool { |
449 | 226 | stream.is_pending_accept |
450 | 226 | } |
451 | | |
452 | 444 | fn set_queued(stream: &mut Stream, val: bool) { |
453 | 444 | stream.is_pending_accept = val; |
454 | 444 | } |
455 | | } |
456 | | |
457 | | impl store::Next for NextSend { |
458 | 6.52k | fn next(stream: &Stream) -> Option<store::Key> { |
459 | 6.52k | stream.next_pending_send |
460 | 6.52k | } |
461 | | |
462 | 451k | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
463 | 451k | stream.next_pending_send = key; |
464 | 451k | } |
465 | | |
466 | 451k | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
467 | 451k | stream.next_pending_send.take() |
468 | 451k | } |
469 | | |
470 | 759k | fn is_queued(stream: &Stream) -> bool { |
471 | 759k | stream.is_pending_send |
472 | 759k | } |
473 | | |
474 | 916k | fn set_queued(stream: &mut Stream, val: bool) { |
475 | 916k | if val { |
476 | | // ensure that stream is not queued for being opened |
477 | | // if it's being put into queue for sending data |
478 | 458k | debug_assert!(!stream.is_pending_open); |
479 | 458k | } |
480 | 916k | stream.is_pending_send = val; |
481 | 916k | } |
482 | | } |
483 | | |
484 | | impl store::Next for NextSendCapacity { |
485 | 2.08k | fn next(stream: &Stream) -> Option<store::Key> { |
486 | 2.08k | stream.next_pending_send_capacity |
487 | 2.08k | } |
488 | | |
489 | 210k | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
490 | 210k | stream.next_pending_send_capacity = key; |
491 | 210k | } |
492 | | |
493 | 210k | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
494 | 210k | stream.next_pending_send_capacity.take() |
495 | 210k | } |
496 | | |
497 | 258k | fn is_queued(stream: &Stream) -> bool { |
498 | 258k | stream.is_pending_send_capacity |
499 | 258k | } |
500 | | |
501 | 425k | fn set_queued(stream: &mut Stream, val: bool) { |
502 | 425k | stream.is_pending_send_capacity = val; |
503 | 425k | } |
504 | | } |
505 | | |
506 | | impl store::Next for NextWindowUpdate { |
507 | 0 | fn next(stream: &Stream) -> Option<store::Key> { |
508 | 0 | stream.next_window_update |
509 | 0 | } |
510 | | |
511 | 0 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
512 | 0 | stream.next_window_update = key; |
513 | 0 | } |
514 | | |
515 | 0 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
516 | 0 | stream.next_window_update.take() |
517 | 0 | } |
518 | | |
519 | 0 | fn is_queued(stream: &Stream) -> bool { |
520 | 0 | stream.is_pending_window_update |
521 | 0 | } |
522 | | |
523 | 0 | fn set_queued(stream: &mut Stream, val: bool) { |
524 | 0 | stream.is_pending_window_update = val; |
525 | 0 | } |
526 | | } |
527 | | |
528 | | impl store::Next for NextOpen { |
529 | 3.95k | fn next(stream: &Stream) -> Option<store::Key> { |
530 | 3.95k | stream.next_open |
531 | 3.95k | } |
532 | | |
533 | 410k | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
534 | 410k | stream.next_open = key; |
535 | 410k | } |
536 | | |
537 | 410k | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
538 | 410k | stream.next_open.take() |
539 | 410k | } |
540 | | |
541 | 414k | fn is_queued(stream: &Stream) -> bool { |
542 | 414k | stream.is_pending_open |
543 | 414k | } |
544 | | |
545 | 828k | fn set_queued(stream: &mut Stream, val: bool) { |
546 | 828k | if val { |
547 | | // ensure that stream is not queued for being sent |
548 | | // if it's being put into queue for opening the stream |
549 | 414k | debug_assert!(!stream.is_pending_send); |
550 | 414k | } |
551 | 828k | stream.is_pending_open = val; |
552 | 828k | } |
553 | | } |
554 | | |
555 | | impl store::Next for NextResetExpire { |
556 | 4.31k | fn next(stream: &Stream) -> Option<store::Key> { |
557 | 4.31k | stream.next_reset_expire |
558 | 4.31k | } |
559 | | |
560 | 12.2k | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
561 | 12.2k | stream.next_reset_expire = key; |
562 | 12.2k | } |
563 | | |
564 | 12.2k | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
565 | 12.2k | stream.next_reset_expire.take() |
566 | 12.2k | } |
567 | | |
568 | 16.5k | fn is_queued(stream: &Stream) -> bool { |
569 | 16.5k | stream.reset_at.is_some() |
570 | 16.5k | } |
571 | | |
572 | 33.1k | fn set_queued(stream: &mut Stream, val: bool) { |
573 | 33.1k | if val { |
574 | 16.5k | stream.reset_at = Some(Instant::now()); |
575 | 16.5k | } else { |
576 | 16.5k | stream.reset_at = None; |
577 | 16.5k | } |
578 | 33.1k | } |
579 | | } |
580 | | |
581 | | // ===== impl ContentLength ===== |
582 | | |
583 | | impl ContentLength { |
584 | 1.32k | pub fn is_head(&self) -> bool { |
585 | 1.32k | matches!(*self, Self::Head) |
586 | 1.32k | } |
587 | | } |