/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.6.0/src/upgrade.rs
Line | Count | Source |
1 | | //! HTTP Upgrades |
2 | | //! |
3 | | //! This module deals with managing [HTTP Upgrades][mdn] in hyper. Since |
4 | | //! several concepts in HTTP allow for first talking HTTP, and then converting |
5 | | //! to a different protocol, this module conflates them into a single API. |
6 | | //! Those include: |
7 | | //! |
8 | | //! - HTTP/1.1 Upgrades |
9 | | //! - HTTP `CONNECT` |
10 | | //! |
11 | | //! You are responsible for any other pre-requisites to establish an upgrade, |
12 | | //! such as sending the appropriate headers, methods, and status codes. You can |
13 | | //! then use [`on`][] to grab a `Future` which will resolve to the upgraded |
14 | | //! connection object, or an error if the upgrade fails. |
15 | | //! |
16 | | //! [mdn]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism |
17 | | //! |
18 | | //! # Client |
19 | | //! |
20 | | //! Sending an HTTP upgrade from the [`client`](super::client) involves setting |
21 | | //! either the appropriate method, if wanting to `CONNECT`, or headers such as |
22 | | //! `Upgrade` and `Connection`, on the `http::Request`. Once receiving the |
23 | | //! `http::Response` back, you must check for the specific information that the |
24 | | //! upgrade is agreed upon by the server (such as a `101` status code), and then |
25 | | //! get the `Future` from the `Response`. |
26 | | //! |
27 | | //! # Server |
28 | | //! |
29 | | //! Receiving upgrade requests in a server requires you to check the relevant |
30 | | //! headers in a `Request`, and if an upgrade should be done, you then send the |
31 | | //! corresponding headers in a response. To then wait for hyper to finish the |
32 | | //! upgrade, you call `on()` with the `Request`, and then can spawn a task |
33 | | //! awaiting it. |
34 | | //! |
35 | | //! # Example |
36 | | //! |
37 | | //! See [this example][example] showing how upgrades work with both |
38 | | //! Clients and Servers. |
39 | | //! |
40 | | //! [example]: https://github.com/hyperium/hyper/blob/master/examples/upgrades.rs |
41 | | |
42 | | use std::any::TypeId; |
43 | | use std::error::Error as StdError; |
44 | | use std::fmt; |
45 | | use std::future::Future; |
46 | | use std::io; |
47 | | use std::pin::Pin; |
48 | | use std::sync::{Arc, Mutex}; |
49 | | use std::task::{Context, Poll}; |
50 | | |
51 | | use crate::rt::{Read, ReadBufCursor, Write}; |
52 | | use bytes::Bytes; |
53 | | use tokio::sync::oneshot; |
54 | | |
55 | | use crate::common::io::Rewind; |
56 | | |
57 | | /// An upgraded HTTP connection. |
58 | | /// |
59 | | /// This type holds a trait object internally of the original IO that |
60 | | /// was used to speak HTTP before the upgrade. It can be used directly |
61 | | /// as a [`Read`] or [`Write`] for convenience. |
62 | | /// |
63 | | /// Alternatively, if the exact type is known, this can be deconstructed |
64 | | /// into its parts. |
65 | | pub struct Upgraded { |
66 | | io: Rewind<Box<dyn Io + Send>>, |
67 | | } |
68 | | |
69 | | /// A future for a possible HTTP upgrade. |
70 | | /// |
71 | | /// If no upgrade was available, or it doesn't succeed, yields an `Error`. |
72 | | #[derive(Clone)] |
73 | | pub struct OnUpgrade { |
74 | | rx: Option<Arc<Mutex<oneshot::Receiver<crate::Result<Upgraded>>>>>, |
75 | | } |
76 | | |
77 | | /// The deconstructed parts of an [`Upgraded`] type. |
78 | | /// |
79 | | /// Includes the original IO type, and a read buffer of bytes that the |
80 | | /// HTTP state machine may have already read before completing an upgrade. |
81 | | #[derive(Debug)] |
82 | | #[non_exhaustive] |
83 | | pub struct Parts<T> { |
84 | | /// The original IO object used before the upgrade. |
85 | | pub io: T, |
86 | | /// A buffer of bytes that have been read but not processed as HTTP. |
87 | | /// |
88 | | /// For instance, if the `Connection` is used for an HTTP upgrade request, |
89 | | /// it is possible the server sent back the first bytes of the new protocol |
90 | | /// along with the response upgrade. |
91 | | /// |
92 | | /// You will want to check for any existing bytes if you plan to continue |
93 | | /// communicating on the IO object. |
94 | | pub read_buf: Bytes, |
95 | | } |
96 | | |
97 | | /// Gets a pending HTTP upgrade from this message. |
98 | | /// |
99 | | /// This can be called on the following types: |
100 | | /// |
101 | | /// - `http::Request<B>` |
102 | | /// - `http::Response<B>` |
103 | | /// - `&mut http::Request<B>` |
104 | | /// - `&mut http::Response<B>` |
105 | 0 | pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade { |
106 | 0 | msg.on_upgrade() |
107 | 0 | } |
108 | | |
109 | | #[cfg(all( |
110 | | any(feature = "client", feature = "server"), |
111 | | any(feature = "http1", feature = "http2"), |
112 | | ))] |
113 | | pub(super) struct Pending { |
114 | | tx: oneshot::Sender<crate::Result<Upgraded>>, |
115 | | } |
116 | | |
117 | | #[cfg(all( |
118 | | any(feature = "client", feature = "server"), |
119 | | any(feature = "http1", feature = "http2"), |
120 | | ))] |
121 | 0 | pub(super) fn pending() -> (Pending, OnUpgrade) { |
122 | 0 | let (tx, rx) = oneshot::channel(); |
123 | 0 | ( |
124 | 0 | Pending { tx }, |
125 | 0 | OnUpgrade { |
126 | 0 | rx: Some(Arc::new(Mutex::new(rx))), |
127 | 0 | }, |
128 | 0 | ) |
129 | 0 | } |
130 | | |
131 | | // ===== impl Upgraded ===== |
132 | | |
133 | | impl Upgraded { |
134 | | #[cfg(all( |
135 | | any(feature = "client", feature = "server"), |
136 | | any(feature = "http1", feature = "http2") |
137 | | ))] |
138 | 0 | pub(super) fn new<T>(io: T, read_buf: Bytes) -> Self |
139 | 0 | where |
140 | 0 | T: Read + Write + Unpin + Send + 'static, |
141 | | { |
142 | 0 | Upgraded { |
143 | 0 | io: Rewind::new_buffered(Box::new(io), read_buf), |
144 | 0 | } |
145 | 0 | } Unexecuted instantiation: <hyper::upgrade::Upgraded>::new::<hyper::proto::h2::H2Upgraded<bytes::bytes::Bytes>> Unexecuted instantiation: <hyper::upgrade::Upgraded>::new::<_> |
146 | | |
147 | | /// Tries to downcast the internal trait object to the type passed. |
148 | | /// |
149 | | /// On success, returns the downcasted parts. On error, returns the |
150 | | /// `Upgraded` back. |
151 | 0 | pub fn downcast<T: Read + Write + Unpin + 'static>(self) -> Result<Parts<T>, Self> { |
152 | 0 | let (io, buf) = self.io.into_inner(); |
153 | 0 | match io.__hyper_downcast() { |
154 | 0 | Ok(t) => Ok(Parts { |
155 | 0 | io: *t, |
156 | 0 | read_buf: buf, |
157 | 0 | }), |
158 | 0 | Err(io) => Err(Upgraded { |
159 | 0 | io: Rewind::new_buffered(io, buf), |
160 | 0 | }), |
161 | | } |
162 | 0 | } |
163 | | } |
164 | | |
165 | | impl Read for Upgraded { |
166 | 0 | fn poll_read( |
167 | 0 | mut self: Pin<&mut Self>, |
168 | 0 | cx: &mut Context<'_>, |
169 | 0 | buf: ReadBufCursor<'_>, |
170 | 0 | ) -> Poll<io::Result<()>> { |
171 | 0 | Pin::new(&mut self.io).poll_read(cx, buf) |
172 | 0 | } |
173 | | } |
174 | | |
175 | | impl Write for Upgraded { |
176 | 0 | fn poll_write( |
177 | 0 | mut self: Pin<&mut Self>, |
178 | 0 | cx: &mut Context<'_>, |
179 | 0 | buf: &[u8], |
180 | 0 | ) -> Poll<io::Result<usize>> { |
181 | 0 | Pin::new(&mut self.io).poll_write(cx, buf) |
182 | 0 | } |
183 | | |
184 | 0 | fn poll_write_vectored( |
185 | 0 | mut self: Pin<&mut Self>, |
186 | 0 | cx: &mut Context<'_>, |
187 | 0 | bufs: &[io::IoSlice<'_>], |
188 | 0 | ) -> Poll<io::Result<usize>> { |
189 | 0 | Pin::new(&mut self.io).poll_write_vectored(cx, bufs) |
190 | 0 | } |
191 | | |
192 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
193 | 0 | Pin::new(&mut self.io).poll_flush(cx) |
194 | 0 | } |
195 | | |
196 | 0 | fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
197 | 0 | Pin::new(&mut self.io).poll_shutdown(cx) |
198 | 0 | } |
199 | | |
200 | 0 | fn is_write_vectored(&self) -> bool { |
201 | 0 | self.io.is_write_vectored() |
202 | 0 | } |
203 | | } |
204 | | |
205 | | impl fmt::Debug for Upgraded { |
206 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
207 | 0 | f.debug_struct("Upgraded").finish() |
208 | 0 | } |
209 | | } |
210 | | |
211 | | // ===== impl OnUpgrade ===== |
212 | | |
213 | | impl OnUpgrade { |
214 | 0 | pub(super) fn none() -> Self { |
215 | 0 | OnUpgrade { rx: None } |
216 | 0 | } |
217 | | |
218 | | #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] |
219 | 0 | pub(super) fn is_none(&self) -> bool { |
220 | 0 | self.rx.is_none() |
221 | 0 | } |
222 | | } |
223 | | |
224 | | impl Future for OnUpgrade { |
225 | | type Output = Result<Upgraded, crate::Error>; |
226 | | |
227 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
228 | 0 | match self.rx { |
229 | 0 | Some(ref rx) => Pin::new(&mut *rx.lock().unwrap()) |
230 | 0 | .poll(cx) |
231 | 0 | .map(|res| match res { |
232 | 0 | Ok(Ok(upgraded)) => Ok(upgraded), |
233 | 0 | Ok(Err(err)) => Err(err), |
234 | 0 | Err(_oneshot_canceled) => { |
235 | 0 | Err(crate::Error::new_canceled().with(UpgradeExpected)) |
236 | | } |
237 | 0 | }), |
238 | 0 | None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())), |
239 | | } |
240 | 0 | } |
241 | | } |
242 | | |
243 | | impl fmt::Debug for OnUpgrade { |
244 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
245 | 0 | f.debug_struct("OnUpgrade").finish() |
246 | 0 | } |
247 | | } |
248 | | |
249 | | // ===== impl Pending ===== |
250 | | |
251 | | #[cfg(all( |
252 | | any(feature = "client", feature = "server"), |
253 | | any(feature = "http1", feature = "http2") |
254 | | ))] |
255 | | impl Pending { |
256 | 0 | pub(super) fn fulfill(self, upgraded: Upgraded) { |
257 | | trace!("pending upgrade fulfill"); |
258 | 0 | let _ = self.tx.send(Ok(upgraded)); |
259 | 0 | } |
260 | | |
261 | | #[cfg(feature = "http1")] |
262 | | /// Don't fulfill the pending Upgrade, but instead signal that |
263 | | /// upgrades are handled manually. |
264 | 0 | pub(super) fn manual(self) { |
265 | | #[cfg(any(feature = "http1", feature = "http2"))] |
266 | | trace!("pending upgrade handled manually"); |
267 | 0 | let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade())); |
268 | 0 | } |
269 | | } |
270 | | |
271 | | // ===== impl UpgradeExpected ===== |
272 | | |
273 | | /// Error cause returned when an upgrade was expected but canceled |
274 | | /// for whatever reason. |
275 | | /// |
276 | | /// This likely means the actual `Conn` future wasn't polled and upgraded. |
277 | | #[derive(Debug)] |
278 | | struct UpgradeExpected; |
279 | | |
280 | | impl fmt::Display for UpgradeExpected { |
281 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
282 | 0 | f.write_str("upgrade expected but not completed") |
283 | 0 | } |
284 | | } |
285 | | |
286 | | impl StdError for UpgradeExpected {} |
287 | | |
288 | | // ===== impl Io ===== |
289 | | |
290 | | pub(super) trait Io: Read + Write + Unpin + 'static { |
291 | 0 | fn __hyper_type_id(&self) -> TypeId { |
292 | 0 | TypeId::of::<Self>() |
293 | 0 | } Unexecuted instantiation: <hyper::proto::h2::H2Upgraded<bytes::bytes::Bytes> as hyper::upgrade::Io>::__hyper_type_id Unexecuted instantiation: <_ as hyper::upgrade::Io>::__hyper_type_id |
294 | | } |
295 | | |
296 | | impl<T: Read + Write + Unpin + 'static> Io for T {} |
297 | | |
298 | | impl dyn Io + Send { |
299 | 0 | fn __hyper_is<T: Io>(&self) -> bool { |
300 | 0 | let t = TypeId::of::<T>(); |
301 | 0 | self.__hyper_type_id() == t |
302 | 0 | } |
303 | | |
304 | 0 | fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> { |
305 | 0 | if self.__hyper_is::<T>() { |
306 | | // Taken from `std::error::Error::downcast()`. |
307 | | unsafe { |
308 | 0 | let raw: *mut dyn Io = Box::into_raw(self); |
309 | 0 | Ok(Box::from_raw(raw as *mut T)) |
310 | | } |
311 | | } else { |
312 | 0 | Err(self) |
313 | | } |
314 | 0 | } |
315 | | } |
316 | | |
317 | | mod sealed { |
318 | | use super::OnUpgrade; |
319 | | |
320 | | pub trait CanUpgrade { |
321 | | fn on_upgrade(self) -> OnUpgrade; |
322 | | } |
323 | | |
324 | | impl<B> CanUpgrade for http::Request<B> { |
325 | 0 | fn on_upgrade(mut self) -> OnUpgrade { |
326 | 0 | self.extensions_mut() |
327 | 0 | .remove::<OnUpgrade>() |
328 | 0 | .unwrap_or_else(OnUpgrade::none) |
329 | 0 | } |
330 | | } |
331 | | |
332 | | impl<B> CanUpgrade for &'_ mut http::Request<B> { |
333 | 0 | fn on_upgrade(self) -> OnUpgrade { |
334 | 0 | self.extensions_mut() |
335 | 0 | .remove::<OnUpgrade>() |
336 | 0 | .unwrap_or_else(OnUpgrade::none) |
337 | 0 | } |
338 | | } |
339 | | |
340 | | impl<B> CanUpgrade for http::Response<B> { |
341 | 0 | fn on_upgrade(mut self) -> OnUpgrade { |
342 | 0 | self.extensions_mut() |
343 | 0 | .remove::<OnUpgrade>() |
344 | 0 | .unwrap_or_else(OnUpgrade::none) |
345 | 0 | } |
346 | | } |
347 | | |
348 | | impl<B> CanUpgrade for &'_ mut http::Response<B> { |
349 | 0 | fn on_upgrade(self) -> OnUpgrade { |
350 | 0 | self.extensions_mut() |
351 | 0 | .remove::<OnUpgrade>() |
352 | 0 | .unwrap_or_else(OnUpgrade::none) |
353 | 0 | } |
354 | | } |
355 | | } |
356 | | |
357 | | #[cfg(all( |
358 | | any(feature = "client", feature = "server"), |
359 | | any(feature = "http1", feature = "http2"), |
360 | | ))] |
361 | | #[cfg(test)] |
362 | | mod tests { |
363 | | use super::*; |
364 | | |
365 | | #[test] |
366 | | fn upgraded_downcast() { |
367 | | let upgraded = Upgraded::new(Mock, Bytes::new()); |
368 | | |
369 | | let upgraded = upgraded |
370 | | .downcast::<crate::common::io::Compat<std::io::Cursor<Vec<u8>>>>() |
371 | | .unwrap_err(); |
372 | | |
373 | | upgraded.downcast::<Mock>().unwrap(); |
374 | | } |
375 | | |
376 | | // TODO: replace with tokio_test::io when it can test write_buf |
377 | | struct Mock; |
378 | | |
379 | | impl Read for Mock { |
380 | | fn poll_read( |
381 | | self: Pin<&mut Self>, |
382 | | _cx: &mut Context<'_>, |
383 | | _buf: ReadBufCursor<'_>, |
384 | | ) -> Poll<io::Result<()>> { |
385 | | unreachable!("Mock::poll_read") |
386 | | } |
387 | | } |
388 | | |
389 | | impl Write for Mock { |
390 | | fn poll_write( |
391 | | self: Pin<&mut Self>, |
392 | | _: &mut Context<'_>, |
393 | | buf: &[u8], |
394 | | ) -> Poll<io::Result<usize>> { |
395 | | // panic!("poll_write shouldn't be called"); |
396 | | Poll::Ready(Ok(buf.len())) |
397 | | } |
398 | | |
399 | | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
400 | | unreachable!("Mock::poll_flush") |
401 | | } |
402 | | |
403 | | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
404 | | unreachable!("Mock::poll_shutdown") |
405 | | } |
406 | | } |
407 | | } |