/src/h2/src/proto/streams/state.rs
Line | Count | Source |
1 | | use std::fmt; |
2 | | use std::io; |
3 | | |
4 | | use crate::codec::UserError; |
5 | | use crate::frame::{self, Reason, StreamId}; |
6 | | use crate::proto::{self, Error, Initiator, PollReset}; |
7 | | |
8 | | use self::Inner::*; |
9 | | use self::Peer::*; |
10 | | |
11 | | /// Represents the state of an H2 stream |
12 | | /// |
13 | | /// ```not_rust |
14 | | /// +--------+ |
15 | | /// send PP | | recv PP |
16 | | /// ,--------| idle |--------. |
17 | | /// / | | \ |
18 | | /// v +--------+ v |
19 | | /// +----------+ | +----------+ |
20 | | /// | | | send H / | | |
21 | | /// ,------| reserved | | recv H | reserved |------. |
22 | | /// | | (local) | | | (remote) | | |
23 | | /// | +----------+ v +----------+ | |
24 | | /// | | +--------+ | | |
25 | | /// | | recv ES | | send ES | | |
26 | | /// | send H | ,-------| open |-------. | recv H | |
27 | | /// | | / | | \ | | |
28 | | /// | v v +--------+ v v | |
29 | | /// | +----------+ | +----------+ | |
30 | | /// | | half | | | half | | |
31 | | /// | | closed | | send R / | closed | | |
32 | | /// | | (remote) | | recv R | (local) | | |
33 | | /// | +----------+ | +----------+ | |
34 | | /// | | | | | |
35 | | /// | | send ES / | recv ES / | | |
36 | | /// | | send R / v send R / | | |
37 | | /// | | recv R +--------+ recv R | | |
38 | | /// | send R / `----------->| |<-----------' send R / | |
39 | | /// | recv R | closed | recv R | |
40 | | /// `----------------------->| |<----------------------' |
41 | | /// +--------+ |
42 | | /// |
43 | | /// send: endpoint sends this frame |
44 | | /// recv: endpoint receives this frame |
45 | | /// |
46 | | /// H: HEADERS frame (with implied CONTINUATIONs) |
47 | | /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) |
48 | | /// ES: END_STREAM flag |
49 | | /// R: RST_STREAM frame |
50 | | /// ``` |
51 | | #[derive(Clone)] |
52 | | pub struct State { |
53 | | inner: Inner, |
54 | | } |
55 | | |
56 | | #[derive(Debug, Clone)] |
57 | | enum Inner { |
58 | | Idle, |
59 | | // TODO: these states shouldn't count against concurrency limits: |
60 | | ReservedLocal, |
61 | | ReservedRemote, |
62 | | Open { local: Peer, remote: Peer }, |
63 | | HalfClosedLocal(Peer), // TODO: explicitly name this value |
64 | | HalfClosedRemote(Peer), |
65 | | Closed(Cause), |
66 | | } |
67 | | |
68 | | #[derive(Debug, Copy, Clone, Default)] |
69 | | enum Peer { |
70 | | #[default] |
71 | | AwaitingHeaders, |
72 | | Streaming, |
73 | | } |
74 | | |
75 | | #[derive(Debug, Clone)] |
76 | | enum Cause { |
77 | | EndStream, |
78 | | Error(Error), |
79 | | |
80 | | /// This indicates to the connection that a reset frame must be sent out |
81 | | /// once the send queue has been flushed. |
82 | | /// |
83 | | /// Examples of when this could happen: |
84 | | /// - User drops all references to a stream, so we want to CANCEL the it. |
85 | | /// - Header block size was too large, so we want to REFUSE, possibly |
86 | | /// after sending a 431 response frame. |
87 | | ScheduledLibraryReset(Reason), |
88 | | } |
89 | | |
90 | | impl State { |
91 | | /// Opens the send-half of a stream if it is not already open. |
92 | 476k | pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { |
93 | 476k | let local = Streaming; |
94 | | |
95 | 476k | self.inner = match self.inner { |
96 | | Idle => { |
97 | 476k | if eos { |
98 | 739 | HalfClosedLocal(AwaitingHeaders) |
99 | | } else { |
100 | 476k | Open { |
101 | 476k | local, |
102 | 476k | remote: AwaitingHeaders, |
103 | 476k | } |
104 | | } |
105 | | } |
106 | | Open { |
107 | | local: AwaitingHeaders, |
108 | 0 | remote, |
109 | | } => { |
110 | 0 | if eos { |
111 | 0 | HalfClosedLocal(remote) |
112 | | } else { |
113 | 0 | Open { local, remote } |
114 | | } |
115 | | } |
116 | | HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { |
117 | 0 | if eos { |
118 | 0 | Closed(Cause::EndStream) |
119 | | } else { |
120 | 0 | HalfClosedRemote(local) |
121 | | } |
122 | | } |
123 | | _ => { |
124 | | // All other transitions result in a protocol error |
125 | 0 | return Err(UserError::UnexpectedFrameType); |
126 | | } |
127 | | }; |
128 | | |
129 | 476k | Ok(()) |
130 | 476k | } |
131 | | |
132 | | /// Opens the receive-half of the stream when a HEADERS frame is received. |
133 | | /// |
134 | | /// Returns true if this transitions the state to Open. |
135 | 2.87k | pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { |
136 | 2.87k | let mut initial = false; |
137 | 2.87k | let eos = frame.is_end_stream(); |
138 | | |
139 | 2.87k | self.inner = match self.inner { |
140 | | Idle => { |
141 | 0 | initial = true; |
142 | | |
143 | 0 | if eos { |
144 | 0 | HalfClosedRemote(AwaitingHeaders) |
145 | | } else { |
146 | | Open { |
147 | 0 | local: AwaitingHeaders, |
148 | 0 | remote: if frame.is_informational() { |
149 | 0 | tracing::trace!("skipping 1xx response headers"); |
150 | 0 | AwaitingHeaders |
151 | | } else { |
152 | 0 | Streaming |
153 | | }, |
154 | | } |
155 | | } |
156 | | } |
157 | | ReservedRemote => { |
158 | 2 | initial = true; |
159 | | |
160 | 2 | if eos { |
161 | 1 | Closed(Cause::EndStream) |
162 | 1 | } else if frame.is_informational() { |
163 | 0 | tracing::trace!("skipping 1xx response headers"); |
164 | 0 | ReservedRemote |
165 | | } else { |
166 | 1 | HalfClosedLocal(Streaming) |
167 | | } |
168 | | } |
169 | | Open { |
170 | 0 | local, |
171 | | remote: AwaitingHeaders, |
172 | | } => { |
173 | 0 | if eos { |
174 | 0 | HalfClosedRemote(local) |
175 | | } else { |
176 | | Open { |
177 | 0 | local, |
178 | 0 | remote: if frame.is_informational() { |
179 | 0 | tracing::trace!("skipping 1xx response headers"); |
180 | 0 | AwaitingHeaders |
181 | | } else { |
182 | 0 | Streaming |
183 | | }, |
184 | | } |
185 | | } |
186 | | } |
187 | | HalfClosedLocal(AwaitingHeaders) => { |
188 | 2.86k | if eos { |
189 | 259 | Closed(Cause::EndStream) |
190 | 2.60k | } else if frame.is_informational() { |
191 | 404 | tracing::trace!("skipping 1xx response headers"); |
192 | 404 | HalfClosedLocal(AwaitingHeaders) |
193 | | } else { |
194 | 2.20k | HalfClosedLocal(Streaming) |
195 | | } |
196 | | } |
197 | 0 | ref state => { |
198 | | // All other transitions result in a protocol error |
199 | 0 | proto_err!(conn: "recv_open: in unexpected state {:?}", state); |
200 | 0 | return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); |
201 | | } |
202 | | }; |
203 | | |
204 | 2.87k | Ok(initial) |
205 | 2.87k | } |
206 | | |
207 | | /// Transition from Idle -> ReservedRemote |
208 | 1.15k | pub fn reserve_remote(&mut self) -> Result<(), Error> { |
209 | 1.15k | match self.inner { |
210 | | Idle => { |
211 | 1.15k | self.inner = ReservedRemote; |
212 | 1.15k | Ok(()) |
213 | | } |
214 | 0 | ref state => { |
215 | 0 | proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); |
216 | 0 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
217 | | } |
218 | | } |
219 | 1.15k | } |
220 | | |
221 | | /// Transition from Idle -> ReservedLocal |
222 | 0 | pub fn reserve_local(&mut self) -> Result<(), UserError> { |
223 | 0 | match self.inner { |
224 | | Idle => { |
225 | 0 | self.inner = ReservedLocal; |
226 | 0 | Ok(()) |
227 | | } |
228 | 0 | _ => Err(UserError::UnexpectedFrameType), |
229 | | } |
230 | 0 | } |
231 | | |
232 | | /// Indicates that the remote side will not send more data to the local. |
233 | 125 | pub fn recv_close(&mut self) -> Result<(), Error> { |
234 | 125 | match self.inner { |
235 | 0 | Open { local, .. } => { |
236 | | // The remote side will continue to receive data. |
237 | 0 | tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); |
238 | 0 | self.inner = HalfClosedRemote(local); |
239 | 0 | Ok(()) |
240 | | } |
241 | | HalfClosedLocal(..) => { |
242 | 120 | tracing::trace!("recv_close: HalfClosedLocal => Closed"); |
243 | 120 | self.inner = Closed(Cause::EndStream); |
244 | 120 | Ok(()) |
245 | | } |
246 | 5 | ref state => { |
247 | 5 | proto_err!(conn: "recv_close: in unexpected state {:?}", state); |
248 | 5 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
249 | | } |
250 | | } |
251 | 125 | } |
252 | | |
253 | | /// The remote explicitly sent a RST_STREAM. |
254 | | /// |
255 | | /// # Arguments |
256 | | /// - `frame`: the received RST_STREAM frame. |
257 | | /// - `queued`: true if this stream has frames in the pending send queue. |
258 | 832 | pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { |
259 | 640 | match self.inner { |
260 | | // If the stream is already in a `Closed` state, do nothing, |
261 | | // provided that there are no frames still in the send queue. |
262 | 201 | Closed(..) if !queued => {} |
263 | | // A notionally `Closed` stream may still have queued frames in |
264 | | // the following cases: |
265 | | // |
266 | | // - if the cause is `Cause::Scheduled(..)` (i.e. we have not |
267 | | // actually closed the stream yet). |
268 | | // - if the cause is `Cause::EndStream`: we transition to this |
269 | | // state when an EOS frame is *enqueued* (so that it's invalid |
270 | | // to enqueue more frames), not when the EOS frame is *sent*; |
271 | | // therefore, there may still be frames ahead of the EOS frame |
272 | | // in the send queue. |
273 | | // |
274 | | // In either of these cases, we want to overwrite the stream's |
275 | | // previous state with the received RST_STREAM, so that the queue |
276 | | // will be cleared by `Prioritize::pop_frame`. |
277 | 631 | ref state => { |
278 | 631 | tracing::trace!( |
279 | 0 | "recv_reset; frame={:?}; state={:?}; queued={:?}", |
280 | | frame, |
281 | | state, |
282 | | queued |
283 | | ); |
284 | 631 | self.inner = Closed(Cause::Error(Error::remote_reset( |
285 | 631 | frame.stream_id(), |
286 | 631 | frame.reason(), |
287 | 631 | ))); |
288 | | } |
289 | | } |
290 | 832 | } |
291 | | |
292 | | /// Handle a connection-level error. |
293 | 468k | pub fn handle_error(&mut self, err: &proto::Error) { |
294 | 468k | match self.inner { |
295 | 254k | Closed(..) => {} |
296 | | _ => { |
297 | 214k | tracing::trace!("handle_error; err={:?}", err); |
298 | 214k | self.inner = Closed(Cause::Error(err.clone())); |
299 | | } |
300 | | } |
301 | 468k | } |
302 | | |
303 | 282k | pub fn recv_eof(&mut self) { |
304 | 282k | match self.inner { |
305 | 104k | Closed(..) => {} |
306 | 177k | ref state => { |
307 | 177k | tracing::trace!("recv_eof; state={:?}", state); |
308 | 177k | self.inner = Closed(Cause::Error( |
309 | 177k | io::Error::new( |
310 | 177k | io::ErrorKind::BrokenPipe, |
311 | 177k | "stream closed because of a broken pipe", |
312 | 177k | ) |
313 | 177k | .into(), |
314 | 177k | )); |
315 | | } |
316 | | } |
317 | 282k | } |
318 | | |
319 | | /// Indicates that the local side will not send more data to the local. |
320 | 476k | pub fn send_close(&mut self) { |
321 | 476k | match self.inner { |
322 | 476k | Open { remote, .. } => { |
323 | | // The remote side will continue to receive data. |
324 | 476k | tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); |
325 | 476k | self.inner = HalfClosedLocal(remote); |
326 | | } |
327 | | HalfClosedRemote(..) => { |
328 | 0 | tracing::trace!("send_close: HalfClosedRemote => Closed"); |
329 | 0 | self.inner = Closed(Cause::EndStream); |
330 | | } |
331 | 0 | ref state => panic!("send_close: unexpected state {:?}", state), |
332 | | } |
333 | 476k | } |
334 | | |
335 | | /// Set the stream state to reset locally. |
336 | 136k | pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { |
337 | 136k | self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); |
338 | 136k | } |
339 | | |
340 | | /// Set the stream state to a scheduled reset. |
341 | 84.0k | pub fn set_scheduled_reset(&mut self, reason: Reason) { |
342 | 84.0k | debug_assert!(!self.is_closed()); |
343 | 84.0k | self.inner = Closed(Cause::ScheduledLibraryReset(reason)); |
344 | 84.0k | } |
345 | | |
346 | 237k | pub fn get_scheduled_reset(&self) -> Option<Reason> { |
347 | 237k | match self.inner { |
348 | 75.2k | Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), |
349 | 162k | _ => None, |
350 | | } |
351 | 237k | } |
352 | | |
353 | 1.75M | pub fn is_scheduled_reset(&self) -> bool { |
354 | 1.73M | matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) |
355 | 1.75M | } |
356 | | |
357 | 196k | pub fn is_local_error(&self) -> bool { |
358 | 188k | match self.inner { |
359 | 104k | Closed(Cause::Error(ref e)) => e.is_local(), |
360 | 84.5k | Closed(Cause::ScheduledLibraryReset(..)) => true, |
361 | 7.55k | _ => false, |
362 | | } |
363 | 196k | } |
364 | | |
365 | 0 | pub fn is_remote_reset(&self) -> bool { |
366 | 0 | matches!( |
367 | 0 | self.inner, |
368 | | Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) |
369 | | ) |
370 | 0 | } |
371 | | |
372 | | /// Returns true if the stream is already reset. |
373 | 84.7k | pub fn is_reset(&self) -> bool { |
374 | 23.9k | match self.inner { |
375 | 61 | Closed(Cause::EndStream) => false, |
376 | 23.9k | Closed(_) => true, |
377 | 60.7k | _ => false, |
378 | | } |
379 | 84.7k | } |
380 | | |
381 | 689k | pub fn is_send_streaming(&self) -> bool { |
382 | 213k | matches!( |
383 | 476k | self.inner, |
384 | | Open { |
385 | | local: Streaming, |
386 | | .. |
387 | | } | HalfClosedRemote(Streaming) |
388 | | ) |
389 | 689k | } |
390 | | |
391 | | /// Returns true when the stream is in a state to receive headers |
392 | 3.92k | pub fn is_recv_headers(&self) -> bool { |
393 | 1.05k | matches!( |
394 | 0 | self.inner, |
395 | | Idle | Open { |
396 | | remote: AwaitingHeaders, |
397 | | .. |
398 | | } | HalfClosedLocal(AwaitingHeaders) |
399 | | | ReservedRemote |
400 | | ) |
401 | 3.92k | } |
402 | | |
403 | 4.33k | pub fn is_recv_streaming(&self) -> bool { |
404 | 29 | matches!( |
405 | 0 | self.inner, |
406 | | Open { |
407 | | remote: Streaming, |
408 | | .. |
409 | | } | HalfClosedLocal(Streaming) |
410 | | ) |
411 | 4.33k | } |
412 | | |
413 | 0 | pub fn is_recv_end_stream(&self) -> bool { |
414 | | // In either case END_STREAM has been received |
415 | 0 | matches!(self.inner, Closed(Cause::EndStream) | HalfClosedRemote(..)) |
416 | 0 | } |
417 | | |
418 | 7.46M | pub fn is_closed(&self) -> bool { |
419 | 7.46M | matches!(self.inner, Closed(_)) |
420 | 7.46M | } |
421 | | |
422 | 234k | pub fn is_send_closed(&self) -> bool { |
423 | 0 | matches!( |
424 | 234k | self.inner, |
425 | | Closed(..) | HalfClosedLocal(..) | ReservedRemote |
426 | | ) |
427 | 234k | } |
428 | | |
429 | 0 | pub fn is_idle(&self) -> bool { |
430 | 0 | matches!(self.inner, Idle) |
431 | 0 | } |
432 | | |
433 | 515k | pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { |
434 | | // TODO: Is this correct? |
435 | 39.8k | match self.inner { |
436 | 39.8k | Closed(Cause::Error(ref e)) => Err(e.clone()), |
437 | 1 | Closed(Cause::ScheduledLibraryReset(reason)) => { |
438 | 1 | Err(proto::Error::library_go_away(reason)) |
439 | | } |
440 | 15 | Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), |
441 | 475k | _ => Ok(true), |
442 | | } |
443 | 515k | } |
444 | | |
445 | | /// Returns a reason if the stream has been reset. |
446 | 0 | pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { |
447 | 0 | match self.inner { |
448 | 0 | Closed(Cause::Error(Error::Reset(_, reason, _))) |
449 | 0 | | Closed(Cause::Error(Error::GoAway(_, reason, _))) |
450 | 0 | | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), |
451 | 0 | Closed(Cause::Error(ref e)) => Err(e.clone().into()), |
452 | | Open { |
453 | | local: Streaming, .. |
454 | | } |
455 | 0 | | HalfClosedRemote(Streaming) => match mode { |
456 | 0 | PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), |
457 | 0 | PollReset::Streaming => Ok(None), |
458 | | }, |
459 | 0 | _ => Ok(None), |
460 | | } |
461 | 0 | } |
462 | | } |
463 | | |
464 | | impl Default for State { |
465 | 537k | fn default() -> State { |
466 | 537k | State { inner: Inner::Idle } |
467 | 537k | } |
468 | | } |
469 | | |
470 | | // remove some noise for debug output |
471 | | impl fmt::Debug for State { |
472 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
473 | 0 | self.inner.fmt(f) |
474 | 0 | } |
475 | | } |