/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-util-0.1.20/src/rt/tokio.rs
Line | Count | Source |
1 | | //! [`tokio`] runtime components integration for [`hyper`]. |
2 | | //! |
3 | | //! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to |
4 | | //! its underlying asynchronous runtime. This submodule provides glue for |
5 | | //! [`tokio`] users to bridge those types to [`hyper`]'s interfaces. |
6 | | //! |
7 | | //! # IO |
8 | | //! |
9 | | //! [`hyper`] abstracts over asynchronous readers and writers using [`Read`] |
10 | | //! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`] |
11 | | //! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors |
12 | | //! to bridge these two IO ecosystems together: [`TokioIo<I>`], |
13 | | //! [`WithHyperIo<I>`], and [`WithTokioIo<I>`]. |
14 | | //! |
15 | | //! To compare and constrast these IO adaptors and to help explain which |
16 | | //! is the proper choice for your needs, here is a table showing which IO |
17 | | //! traits these implement, given two types `T` and `H` which implement |
18 | | //! Tokio's and Hyper's corresponding IO traits: |
19 | | //! |
20 | | //! | | [`AsyncRead`] | [`AsyncWrite`] | [`Read`] | [`Write`] | |
21 | | //! |--------------------|------------------|-------------------|--------------|--------------| |
22 | | //! | `T` | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | |
23 | | //! | `H` | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | |
24 | | //! | [`TokioIo<T>`] | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | |
25 | | //! | [`TokioIo<H>`] | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | |
26 | | //! | [`WithHyperIo<T>`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | |
27 | | //! | [`WithHyperIo<H>`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | |
28 | | //! | [`WithTokioIo<T>`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | |
29 | | //! | [`WithTokioIo<H>`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | |
30 | | //! |
31 | | //! For most situations, [`TokioIo<I>`] is the proper choice. This should be |
32 | | //! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the |
33 | | //! call-site of a function like [`hyper::client::conn::http1::handshake`]. |
34 | | //! |
35 | | //! [`TokioIo<I>`] switches across these ecosystems, but notably does not |
36 | | //! preserve the existing IO trait implementations of its underlying IO. If |
37 | | //! one wishes to _extend_ IO with additional implementations, |
38 | | //! [`WithHyperIo<I>`] and [`WithTokioIo<I>`] are the correct choice. |
39 | | //! |
40 | | //! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo<I>`]. |
41 | | //! That will implement _both_ sets of IO traits. Conversely, |
42 | | //! [`WithTokioIo<I>`] will implement both sets of IO traits given a |
43 | | //! reader/writer that implements Hyper's [`Read`] and [`Write`]. |
44 | | //! |
45 | | //! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more |
46 | | //! information. |
47 | | //! |
48 | | //! [`AsyncRead`]: tokio::io::AsyncRead |
49 | | //! [`AsyncWrite`]: tokio::io::AsyncWrite |
50 | | //! [`Read`]: hyper::rt::Read |
51 | | //! [`Write`]: hyper::rt::Write |
52 | | //! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io |
53 | | |
54 | | use std::{ |
55 | | future::Future, |
56 | | pin::Pin, |
57 | | task::{Context, Poll}, |
58 | | time::{Duration, Instant}, |
59 | | }; |
60 | | |
61 | | use hyper::rt::{Executor, Sleep, Timer}; |
62 | | use pin_project_lite::pin_project; |
63 | | |
64 | | #[cfg(feature = "tracing")] |
65 | | use tracing::instrument::Instrument; |
66 | | |
67 | | pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo}; |
68 | | |
69 | | mod with_hyper_io; |
70 | | mod with_tokio_io; |
71 | | |
72 | | /// Future executor that utilises `tokio` threads. |
73 | | #[non_exhaustive] |
74 | | #[derive(Default, Debug, Clone)] |
75 | | pub struct TokioExecutor {} |
76 | | |
77 | | pin_project! { |
78 | | /// A wrapper that implements Tokio's IO traits for an inner type that |
79 | | /// implements hyper's IO traits, or vice versa (implements hyper's IO |
80 | | /// traits for a type that implements Tokio's IO traits). |
81 | | #[derive(Debug)] |
82 | | pub struct TokioIo<T> { |
83 | | #[pin] |
84 | | inner: T, |
85 | | } |
86 | | } |
87 | | |
88 | | /// A Timer that uses the tokio runtime. |
89 | | #[non_exhaustive] |
90 | | #[derive(Default, Clone, Debug)] |
91 | | pub struct TokioTimer; |
92 | | |
93 | | // Use TokioSleep to get tokio::time::Sleep to implement Unpin. |
94 | | // see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html |
95 | | pin_project! { |
96 | | #[derive(Debug)] |
97 | | struct TokioSleep { |
98 | | #[pin] |
99 | | inner: tokio::time::Sleep, |
100 | | } |
101 | | } |
102 | | |
103 | | // ===== impl TokioExecutor ===== |
104 | | |
105 | | impl<Fut> Executor<Fut> for TokioExecutor |
106 | | where |
107 | | Fut: Future + Send + 'static, |
108 | | Fut::Output: Send + 'static, |
109 | | { |
110 | 0 | fn execute(&self, fut: Fut) { |
111 | | #[cfg(feature = "tracing")] |
112 | | tokio::spawn(fut.in_current_span()); |
113 | | |
114 | | #[cfg(not(feature = "tracing"))] |
115 | 0 | tokio::spawn(fut); |
116 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioExecutor as hyper::rt::Executor<core::pin::Pin<alloc::boxed::Box<dyn core::future::future::Future<Output = ()> + core::marker::Send>>>>::execute Unexecuted instantiation: <hyper_util::rt::tokio::TokioExecutor as hyper::rt::Executor<_>>::execute |
117 | | } |
118 | | |
119 | | impl TokioExecutor { |
120 | | /// Create new executor that relies on [`tokio::spawn`] to execute futures. |
121 | 0 | pub fn new() -> Self { |
122 | 0 | Self {} |
123 | 0 | } |
124 | | } |
125 | | |
126 | | // ==== impl TokioIo ===== |
127 | | |
128 | | impl<T> TokioIo<T> { |
129 | | /// Wrap a type implementing Tokio's or hyper's IO traits. |
130 | 0 | pub fn new(inner: T) -> Self { |
131 | 0 | Self { inner } |
132 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>::new Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_>>::new |
133 | | |
134 | | /// Borrow the inner type. |
135 | 0 | pub fn inner(&self) -> &T { |
136 | 0 | &self.inner |
137 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>>>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>::inner Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_>>::inner |
138 | | |
139 | | /// Mut borrow the inner type. |
140 | 0 | pub fn inner_mut(&mut self) -> &mut T { |
141 | 0 | &mut self.inner |
142 | 0 | } |
143 | | |
144 | | /// Consume this wrapper and get the inner type. |
145 | 0 | pub fn into_inner(self) -> T { |
146 | 0 | self.inner |
147 | 0 | } |
148 | | } |
149 | | |
150 | | impl<T> hyper::rt::Read for TokioIo<T> |
151 | | where |
152 | | T: tokio::io::AsyncRead, |
153 | | { |
154 | 0 | fn poll_read( |
155 | 0 | self: Pin<&mut Self>, |
156 | 0 | cx: &mut Context<'_>, |
157 | 0 | mut buf: hyper::rt::ReadBufCursor<'_>, |
158 | 0 | ) -> Poll<Result<(), std::io::Error>> { |
159 | 0 | let n = unsafe { |
160 | 0 | let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); |
161 | 0 | match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { |
162 | 0 | Poll::Ready(Ok(())) => tbuf.filled().len(), |
163 | 0 | other => return other, |
164 | | } |
165 | | }; |
166 | | |
167 | 0 | unsafe { |
168 | 0 | buf.advance(n); |
169 | 0 | } |
170 | 0 | Poll::Ready(Ok(())) |
171 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Read>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Read>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Read>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Read>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Read>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Read>::poll_read |
172 | | } |
173 | | |
174 | | impl<T> hyper::rt::Write for TokioIo<T> |
175 | | where |
176 | | T: tokio::io::AsyncWrite, |
177 | | { |
178 | 0 | fn poll_write( |
179 | 0 | self: Pin<&mut Self>, |
180 | 0 | cx: &mut Context<'_>, |
181 | 0 | buf: &[u8], |
182 | 0 | ) -> Poll<Result<usize, std::io::Error>> { |
183 | 0 | tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) |
184 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Write>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Write>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Write>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Write>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Write>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Write>::poll_write |
185 | | |
186 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { |
187 | 0 | tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) |
188 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Write>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Write>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Write>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Write>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Write>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Write>::poll_flush |
189 | | |
190 | 0 | fn poll_shutdown( |
191 | 0 | self: Pin<&mut Self>, |
192 | 0 | cx: &mut Context<'_>, |
193 | 0 | ) -> Poll<Result<(), std::io::Error>> { |
194 | 0 | tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) |
195 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Write>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Write>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Write>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Write>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Write>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Write>::poll_shutdown |
196 | | |
197 | 0 | fn is_write_vectored(&self) -> bool { |
198 | 0 | tokio::io::AsyncWrite::is_write_vectored(&self.inner) |
199 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Write>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Write>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Write>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Write>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Write>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Write>::is_write_vectored |
200 | | |
201 | 0 | fn poll_write_vectored( |
202 | 0 | self: Pin<&mut Self>, |
203 | 0 | cx: &mut Context<'_>, |
204 | 0 | bufs: &[std::io::IoSlice<'_>], |
205 | 0 | ) -> Poll<Result<usize, std::io::Error>> { |
206 | 0 | tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) |
207 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>> as hyper::rt::io::Write>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>>>> as hyper::rt::io::Write>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio_rustls::client::TlsStream<hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>> as hyper::rt::io::Write>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream> as hyper::rt::io::Write>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream> as hyper::rt::io::Write>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as hyper::rt::io::Write>::poll_write_vectored |
208 | | } |
209 | | |
210 | | impl<T> tokio::io::AsyncRead for TokioIo<T> |
211 | | where |
212 | | T: hyper::rt::Read, |
213 | | { |
214 | 0 | fn poll_read( |
215 | 0 | self: Pin<&mut Self>, |
216 | 0 | cx: &mut Context<'_>, |
217 | 0 | tbuf: &mut tokio::io::ReadBuf<'_>, |
218 | 0 | ) -> Poll<Result<(), std::io::Error>> { |
219 | | //let init = tbuf.initialized().len(); |
220 | 0 | let filled = tbuf.filled().len(); |
221 | 0 | let sub_filled = unsafe { |
222 | 0 | let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); |
223 | | |
224 | 0 | match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { |
225 | 0 | Poll::Ready(Ok(())) => buf.filled().len(), |
226 | 0 | other => return other, |
227 | | } |
228 | | }; |
229 | | |
230 | 0 | let n_filled = filled + sub_filled; |
231 | | // At least sub_filled bytes had to have been initialized. |
232 | 0 | let n_init = sub_filled; |
233 | 0 | unsafe { |
234 | 0 | tbuf.assume_init(n_init); |
235 | 0 | tbuf.set_filled(n_filled); |
236 | 0 | } |
237 | | |
238 | 0 | Poll::Ready(Ok(())) |
239 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_read::AsyncRead>::poll_read |
240 | | } |
241 | | |
242 | | impl<T> tokio::io::AsyncWrite for TokioIo<T> |
243 | | where |
244 | | T: hyper::rt::Write, |
245 | | { |
246 | 0 | fn poll_write( |
247 | 0 | self: Pin<&mut Self>, |
248 | 0 | cx: &mut Context<'_>, |
249 | 0 | buf: &[u8], |
250 | 0 | ) -> Poll<Result<usize, std::io::Error>> { |
251 | 0 | hyper::rt::Write::poll_write(self.project().inner, cx, buf) |
252 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_write::AsyncWrite>::poll_write |
253 | | |
254 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { |
255 | 0 | hyper::rt::Write::poll_flush(self.project().inner, cx) |
256 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_write::AsyncWrite>::poll_flush |
257 | | |
258 | 0 | fn poll_shutdown( |
259 | 0 | self: Pin<&mut Self>, |
260 | 0 | cx: &mut Context<'_>, |
261 | 0 | ) -> Poll<Result<(), std::io::Error>> { |
262 | 0 | hyper::rt::Write::poll_shutdown(self.project().inner, cx) |
263 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_write::AsyncWrite>::poll_shutdown |
264 | | |
265 | 0 | fn is_write_vectored(&self) -> bool { |
266 | 0 | hyper::rt::Write::is_write_vectored(&self.inner) |
267 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_write::AsyncWrite>::is_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_write::AsyncWrite>::is_write_vectored |
268 | | |
269 | 0 | fn poll_write_vectored( |
270 | 0 | self: Pin<&mut Self>, |
271 | 0 | cx: &mut Context<'_>, |
272 | 0 | bufs: &[std::io::IoSlice<'_>], |
273 | 0 | ) -> Poll<Result<usize, std::io::Error>> { |
274 | 0 | hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) |
275 | 0 | } Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::unix::stream::UnixStream>> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper_rustls::stream::MaybeHttpsStream<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<hyper::upgrade::Upgraded> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <hyper_util::rt::tokio::TokioIo<_> as tokio::io::async_write::AsyncWrite>::poll_write_vectored |
276 | | } |
277 | | |
278 | | // ==== impl TokioTimer ===== |
279 | | |
280 | | impl Timer for TokioTimer { |
281 | 0 | fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> { |
282 | 0 | Box::pin(TokioSleep { |
283 | 0 | inner: tokio::time::sleep(duration), |
284 | 0 | }) |
285 | 0 | } |
286 | | |
287 | 0 | fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> { |
288 | 0 | Box::pin(TokioSleep { |
289 | 0 | inner: tokio::time::sleep_until(deadline.into()), |
290 | 0 | }) |
291 | 0 | } |
292 | | |
293 | 0 | fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) { |
294 | 0 | if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() { |
295 | 0 | sleep.reset(new_deadline) |
296 | 0 | } |
297 | 0 | } |
298 | | |
299 | 0 | fn now(&self) -> Instant { |
300 | 0 | tokio::time::Instant::now().into() |
301 | 0 | } |
302 | | } |
303 | | |
304 | | impl TokioTimer { |
305 | | /// Create a new TokioTimer |
306 | 0 | pub fn new() -> Self { |
307 | 0 | Self {} |
308 | 0 | } |
309 | | } |
310 | | |
311 | | impl Future for TokioSleep { |
312 | | type Output = (); |
313 | | |
314 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
315 | 0 | self.project().inner.poll(cx) |
316 | 0 | } |
317 | | } |
318 | | |
319 | | impl Sleep for TokioSleep {} |
320 | | |
321 | | impl TokioSleep { |
322 | 0 | fn reset(self: Pin<&mut Self>, deadline: Instant) { |
323 | 0 | self.project().inner.as_mut().reset(deadline.into()); |
324 | 0 | } |
325 | | } |
326 | | |
327 | | #[cfg(test)] |
328 | | mod tests { |
329 | | use crate::rt::TokioExecutor; |
330 | | use hyper::rt::Executor; |
331 | | use tokio::sync::oneshot; |
332 | | |
333 | | #[tokio::test] |
334 | | async fn simple_execute() -> Result<(), Box<dyn std::error::Error>> { |
335 | | let (tx, rx) = oneshot::channel(); |
336 | | let executor = TokioExecutor::new(); |
337 | | executor.execute(async move { |
338 | | tx.send(()).unwrap(); |
339 | | }); |
340 | | rx.await.map_err(Into::into) |
341 | | } |
342 | | } |