Coverage Report

Created: 2025-11-28 06:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.6.0/src/upgrade.rs
Line
Count
Source
1
//! HTTP Upgrades
2
//!
3
//! This module deals with managing [HTTP Upgrades][mdn] in hyper. Since
4
//! several concepts in HTTP allow for first talking HTTP, and then converting
5
//! to a different protocol, this module conflates them into a single API.
6
//! Those include:
7
//!
8
//! - HTTP/1.1 Upgrades
9
//! - HTTP `CONNECT`
10
//!
11
//! You are responsible for any other pre-requisites to establish an upgrade,
12
//! such as sending the appropriate headers, methods, and status codes. You can
13
//! then use [`on`][] to grab a `Future` which will resolve to the upgraded
14
//! connection object, or an error if the upgrade fails.
15
//!
16
//! [mdn]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism
17
//!
18
//! # Client
19
//!
20
//! Sending an HTTP upgrade from the [`client`](super::client) involves setting
21
//! either the appropriate method, if wanting to `CONNECT`, or headers such as
22
//! `Upgrade` and `Connection`, on the `http::Request`. Once receiving the
23
//! `http::Response` back, you must check for the specific information that the
24
//! upgrade is agreed upon by the server (such as a `101` status code), and then
25
//! get the `Future` from the `Response`.
26
//!
27
//! # Server
28
//!
29
//! Receiving upgrade requests in a server requires you to check the relevant
30
//! headers in a `Request`, and if an upgrade should be done, you then send the
31
//! corresponding headers in a response. To then wait for hyper to finish the
32
//! upgrade, you call `on()` with the `Request`, and then can spawn a task
33
//! awaiting it.
34
//!
35
//! # Example
36
//!
37
//! See [this example][example] showing how upgrades work with both
38
//! Clients and Servers.
39
//!
40
//! [example]: https://github.com/hyperium/hyper/blob/master/examples/upgrades.rs
41
42
use std::any::TypeId;
43
use std::error::Error as StdError;
44
use std::fmt;
45
use std::future::Future;
46
use std::io;
47
use std::pin::Pin;
48
use std::sync::{Arc, Mutex};
49
use std::task::{Context, Poll};
50
51
use crate::rt::{Read, ReadBufCursor, Write};
52
use bytes::Bytes;
53
use tokio::sync::oneshot;
54
55
use crate::common::io::Rewind;
56
57
/// An upgraded HTTP connection.
58
///
59
/// This type holds a trait object internally of the original IO that
60
/// was used to speak HTTP before the upgrade. It can be used directly
61
/// as a [`Read`] or [`Write`] for convenience.
62
///
63
/// Alternatively, if the exact type is known, this can be deconstructed
64
/// into its parts.
65
pub struct Upgraded {
66
    io: Rewind<Box<dyn Io + Send>>,
67
}
68
69
/// A future for a possible HTTP upgrade.
70
///
71
/// If no upgrade was available, or it doesn't succeed, yields an `Error`.
72
#[derive(Clone)]
73
pub struct OnUpgrade {
74
    rx: Option<Arc<Mutex<oneshot::Receiver<crate::Result<Upgraded>>>>>,
75
}
76
77
/// The deconstructed parts of an [`Upgraded`] type.
78
///
79
/// Includes the original IO type, and a read buffer of bytes that the
80
/// HTTP state machine may have already read before completing an upgrade.
81
#[derive(Debug)]
82
#[non_exhaustive]
83
pub struct Parts<T> {
84
    /// The original IO object used before the upgrade.
85
    pub io: T,
86
    /// A buffer of bytes that have been read but not processed as HTTP.
87
    ///
88
    /// For instance, if the `Connection` is used for an HTTP upgrade request,
89
    /// it is possible the server sent back the first bytes of the new protocol
90
    /// along with the response upgrade.
91
    ///
92
    /// You will want to check for any existing bytes if you plan to continue
93
    /// communicating on the IO object.
94
    pub read_buf: Bytes,
95
}
96
97
/// Gets a pending HTTP upgrade from this message.
98
///
99
/// This can be called on the following types:
100
///
101
/// - `http::Request<B>`
102
/// - `http::Response<B>`
103
/// - `&mut http::Request<B>`
104
/// - `&mut http::Response<B>`
105
0
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
106
0
    msg.on_upgrade()
107
0
}
108
109
#[cfg(all(
110
    any(feature = "client", feature = "server"),
111
    any(feature = "http1", feature = "http2"),
112
))]
113
pub(super) struct Pending {
114
    tx: oneshot::Sender<crate::Result<Upgraded>>,
115
}
116
117
#[cfg(all(
118
    any(feature = "client", feature = "server"),
119
    any(feature = "http1", feature = "http2"),
120
))]
121
0
pub(super) fn pending() -> (Pending, OnUpgrade) {
122
0
    let (tx, rx) = oneshot::channel();
123
0
    (
124
0
        Pending { tx },
125
0
        OnUpgrade {
126
0
            rx: Some(Arc::new(Mutex::new(rx))),
127
0
        },
128
0
    )
129
0
}
130
131
// ===== impl Upgraded =====
132
133
impl Upgraded {
134
    #[cfg(all(
135
        any(feature = "client", feature = "server"),
136
        any(feature = "http1", feature = "http2")
137
    ))]
138
0
    pub(super) fn new<T>(io: T, read_buf: Bytes) -> Self
139
0
    where
140
0
        T: Read + Write + Unpin + Send + 'static,
141
    {
142
0
        Upgraded {
143
0
            io: Rewind::new_buffered(Box::new(io), read_buf),
144
0
        }
145
0
    }
Unexecuted instantiation: <hyper::upgrade::Upgraded>::new::<hyper::proto::h2::H2Upgraded<bytes::bytes::Bytes>>
Unexecuted instantiation: <hyper::upgrade::Upgraded>::new::<_>
146
147
    /// Tries to downcast the internal trait object to the type passed.
148
    ///
149
    /// On success, returns the downcasted parts. On error, returns the
150
    /// `Upgraded` back.
151
0
    pub fn downcast<T: Read + Write + Unpin + 'static>(self) -> Result<Parts<T>, Self> {
152
0
        let (io, buf) = self.io.into_inner();
153
0
        match io.__hyper_downcast() {
154
0
            Ok(t) => Ok(Parts {
155
0
                io: *t,
156
0
                read_buf: buf,
157
0
            }),
158
0
            Err(io) => Err(Upgraded {
159
0
                io: Rewind::new_buffered(io, buf),
160
0
            }),
161
        }
162
0
    }
163
}
164
165
impl Read for Upgraded {
166
0
    fn poll_read(
167
0
        mut self: Pin<&mut Self>,
168
0
        cx: &mut Context<'_>,
169
0
        buf: ReadBufCursor<'_>,
170
0
    ) -> Poll<io::Result<()>> {
171
0
        Pin::new(&mut self.io).poll_read(cx, buf)
172
0
    }
173
}
174
175
impl Write for Upgraded {
176
0
    fn poll_write(
177
0
        mut self: Pin<&mut Self>,
178
0
        cx: &mut Context<'_>,
179
0
        buf: &[u8],
180
0
    ) -> Poll<io::Result<usize>> {
181
0
        Pin::new(&mut self.io).poll_write(cx, buf)
182
0
    }
183
184
0
    fn poll_write_vectored(
185
0
        mut self: Pin<&mut Self>,
186
0
        cx: &mut Context<'_>,
187
0
        bufs: &[io::IoSlice<'_>],
188
0
    ) -> Poll<io::Result<usize>> {
189
0
        Pin::new(&mut self.io).poll_write_vectored(cx, bufs)
190
0
    }
191
192
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
193
0
        Pin::new(&mut self.io).poll_flush(cx)
194
0
    }
195
196
0
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
197
0
        Pin::new(&mut self.io).poll_shutdown(cx)
198
0
    }
199
200
0
    fn is_write_vectored(&self) -> bool {
201
0
        self.io.is_write_vectored()
202
0
    }
203
}
204
205
impl fmt::Debug for Upgraded {
206
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207
0
        f.debug_struct("Upgraded").finish()
208
0
    }
209
}
210
211
// ===== impl OnUpgrade =====
212
213
impl OnUpgrade {
214
0
    pub(super) fn none() -> Self {
215
0
        OnUpgrade { rx: None }
216
0
    }
217
218
    #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
219
0
    pub(super) fn is_none(&self) -> bool {
220
0
        self.rx.is_none()
221
0
    }
222
}
223
224
impl Future for OnUpgrade {
225
    type Output = Result<Upgraded, crate::Error>;
226
227
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228
0
        match self.rx {
229
0
            Some(ref rx) => Pin::new(&mut *rx.lock().unwrap())
230
0
                .poll(cx)
231
0
                .map(|res| match res {
232
0
                    Ok(Ok(upgraded)) => Ok(upgraded),
233
0
                    Ok(Err(err)) => Err(err),
234
0
                    Err(_oneshot_canceled) => {
235
0
                        Err(crate::Error::new_canceled().with(UpgradeExpected))
236
                    }
237
0
                }),
238
0
            None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())),
239
        }
240
0
    }
241
}
242
243
impl fmt::Debug for OnUpgrade {
244
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245
0
        f.debug_struct("OnUpgrade").finish()
246
0
    }
247
}
248
249
// ===== impl Pending =====
250
251
#[cfg(all(
252
    any(feature = "client", feature = "server"),
253
    any(feature = "http1", feature = "http2")
254
))]
255
impl Pending {
256
0
    pub(super) fn fulfill(self, upgraded: Upgraded) {
257
        trace!("pending upgrade fulfill");
258
0
        let _ = self.tx.send(Ok(upgraded));
259
0
    }
260
261
    #[cfg(feature = "http1")]
262
    /// Don't fulfill the pending Upgrade, but instead signal that
263
    /// upgrades are handled manually.
264
0
    pub(super) fn manual(self) {
265
        #[cfg(any(feature = "http1", feature = "http2"))]
266
        trace!("pending upgrade handled manually");
267
0
        let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade()));
268
0
    }
269
}
270
271
// ===== impl UpgradeExpected =====
272
273
/// Error cause returned when an upgrade was expected but canceled
274
/// for whatever reason.
275
///
276
/// This likely means the actual `Conn` future wasn't polled and upgraded.
277
#[derive(Debug)]
278
struct UpgradeExpected;
279
280
impl fmt::Display for UpgradeExpected {
281
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282
0
        f.write_str("upgrade expected but not completed")
283
0
    }
284
}
285
286
impl StdError for UpgradeExpected {}
287
288
// ===== impl Io =====
289
290
pub(super) trait Io: Read + Write + Unpin + 'static {
291
0
    fn __hyper_type_id(&self) -> TypeId {
292
0
        TypeId::of::<Self>()
293
0
    }
Unexecuted instantiation: <hyper::proto::h2::H2Upgraded<bytes::bytes::Bytes> as hyper::upgrade::Io>::__hyper_type_id
Unexecuted instantiation: <_ as hyper::upgrade::Io>::__hyper_type_id
294
}
295
296
impl<T: Read + Write + Unpin + 'static> Io for T {}
297
298
impl dyn Io + Send {
299
0
    fn __hyper_is<T: Io>(&self) -> bool {
300
0
        let t = TypeId::of::<T>();
301
0
        self.__hyper_type_id() == t
302
0
    }
303
304
0
    fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> {
305
0
        if self.__hyper_is::<T>() {
306
            // Taken from `std::error::Error::downcast()`.
307
            unsafe {
308
0
                let raw: *mut dyn Io = Box::into_raw(self);
309
0
                Ok(Box::from_raw(raw as *mut T))
310
            }
311
        } else {
312
0
            Err(self)
313
        }
314
0
    }
315
}
316
317
mod sealed {
318
    use super::OnUpgrade;
319
320
    pub trait CanUpgrade {
321
        fn on_upgrade(self) -> OnUpgrade;
322
    }
323
324
    impl<B> CanUpgrade for http::Request<B> {
325
0
        fn on_upgrade(mut self) -> OnUpgrade {
326
0
            self.extensions_mut()
327
0
                .remove::<OnUpgrade>()
328
0
                .unwrap_or_else(OnUpgrade::none)
329
0
        }
330
    }
331
332
    impl<B> CanUpgrade for &'_ mut http::Request<B> {
333
0
        fn on_upgrade(self) -> OnUpgrade {
334
0
            self.extensions_mut()
335
0
                .remove::<OnUpgrade>()
336
0
                .unwrap_or_else(OnUpgrade::none)
337
0
        }
338
    }
339
340
    impl<B> CanUpgrade for http::Response<B> {
341
0
        fn on_upgrade(mut self) -> OnUpgrade {
342
0
            self.extensions_mut()
343
0
                .remove::<OnUpgrade>()
344
0
                .unwrap_or_else(OnUpgrade::none)
345
0
        }
346
    }
347
348
    impl<B> CanUpgrade for &'_ mut http::Response<B> {
349
0
        fn on_upgrade(self) -> OnUpgrade {
350
0
            self.extensions_mut()
351
0
                .remove::<OnUpgrade>()
352
0
                .unwrap_or_else(OnUpgrade::none)
353
0
        }
354
    }
355
}
356
357
#[cfg(all(
358
    any(feature = "client", feature = "server"),
359
    any(feature = "http1", feature = "http2"),
360
))]
361
#[cfg(test)]
362
mod tests {
363
    use super::*;
364
365
    #[test]
366
    fn upgraded_downcast() {
367
        let upgraded = Upgraded::new(Mock, Bytes::new());
368
369
        let upgraded = upgraded
370
            .downcast::<crate::common::io::Compat<std::io::Cursor<Vec<u8>>>>()
371
            .unwrap_err();
372
373
        upgraded.downcast::<Mock>().unwrap();
374
    }
375
376
    // TODO: replace with tokio_test::io when it can test write_buf
377
    struct Mock;
378
379
    impl Read for Mock {
380
        fn poll_read(
381
            self: Pin<&mut Self>,
382
            _cx: &mut Context<'_>,
383
            _buf: ReadBufCursor<'_>,
384
        ) -> Poll<io::Result<()>> {
385
            unreachable!("Mock::poll_read")
386
        }
387
    }
388
389
    impl Write for Mock {
390
        fn poll_write(
391
            self: Pin<&mut Self>,
392
            _: &mut Context<'_>,
393
            buf: &[u8],
394
        ) -> Poll<io::Result<usize>> {
395
            // panic!("poll_write shouldn't be called");
396
            Poll::Ready(Ok(buf.len()))
397
        }
398
399
        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
400
            unreachable!("Mock::poll_flush")
401
        }
402
403
        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
404
            unreachable!("Mock::poll_shutdown")
405
        }
406
    }
407
}