/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-rustls-0.26.1/src/common/mod.rs
Line | Count | Source |
1 | | use std::io::{self, IoSlice, Read, Write}; |
2 | | use std::ops::{Deref, DerefMut}; |
3 | | use std::pin::Pin; |
4 | | use std::task::{Context, Poll}; |
5 | | |
6 | | use rustls::{ConnectionCommon, SideData}; |
7 | | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
8 | | |
9 | | mod handshake; |
10 | | pub(crate) use handshake::{IoSession, MidHandshake}; |
11 | | |
12 | | #[derive(Debug)] |
13 | | pub enum TlsState { |
14 | | #[cfg(feature = "early-data")] |
15 | | EarlyData(usize, Vec<u8>), |
16 | | Stream, |
17 | | ReadShutdown, |
18 | | WriteShutdown, |
19 | | FullyShutdown, |
20 | | } |
21 | | |
22 | | impl TlsState { |
23 | | #[inline] |
24 | 0 | pub fn shutdown_read(&mut self) { |
25 | 0 | match *self { |
26 | 0 | TlsState::WriteShutdown | TlsState::FullyShutdown => *self = TlsState::FullyShutdown, |
27 | 0 | _ => *self = TlsState::ReadShutdown, |
28 | | } |
29 | 0 | } Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_read Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_read |
30 | | |
31 | | #[inline] |
32 | 0 | pub fn shutdown_write(&mut self) { |
33 | 0 | match *self { |
34 | 0 | TlsState::ReadShutdown | TlsState::FullyShutdown => *self = TlsState::FullyShutdown, |
35 | 0 | _ => *self = TlsState::WriteShutdown, |
36 | | } |
37 | 0 | } Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_write Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_write |
38 | | |
39 | | #[inline] |
40 | 0 | pub fn writeable(&self) -> bool { |
41 | 0 | !matches!(*self, TlsState::WriteShutdown | TlsState::FullyShutdown) |
42 | 0 | } Unexecuted instantiation: <tokio_rustls::common::TlsState>::writeable Unexecuted instantiation: <tokio_rustls::common::TlsState>::writeable |
43 | | |
44 | | #[inline] |
45 | 0 | pub fn readable(&self) -> bool { |
46 | 0 | !matches!(*self, TlsState::ReadShutdown | TlsState::FullyShutdown) |
47 | 0 | } Unexecuted instantiation: <tokio_rustls::common::TlsState>::readable Unexecuted instantiation: <tokio_rustls::common::TlsState>::readable |
48 | | |
49 | | #[inline] |
50 | | #[cfg(feature = "early-data")] |
51 | | pub fn is_early_data(&self) -> bool { |
52 | | matches!(self, TlsState::EarlyData(..)) |
53 | | } |
54 | | |
55 | | #[inline] |
56 | | #[cfg(not(feature = "early-data"))] |
57 | 0 | pub const fn is_early_data(&self) -> bool { |
58 | 0 | false |
59 | 0 | } Unexecuted instantiation: <tokio_rustls::common::TlsState>::is_early_data Unexecuted instantiation: <tokio_rustls::common::TlsState>::is_early_data |
60 | | } |
61 | | |
62 | | pub struct Stream<'a, IO, C> { |
63 | | pub io: &'a mut IO, |
64 | | pub session: &'a mut C, |
65 | | pub eof: bool, |
66 | | } |
67 | | |
68 | | impl<'a, IO: AsyncRead + AsyncWrite + Unpin, C, SD> Stream<'a, IO, C> |
69 | | where |
70 | | C: DerefMut + Deref<Target = ConnectionCommon<SD>>, |
71 | | SD: SideData, |
72 | | { |
73 | 0 | pub fn new(io: &'a mut IO, session: &'a mut C) -> Self { |
74 | 0 | Stream { |
75 | 0 | io, |
76 | 0 | session, |
77 | 0 | // The state so far is only used to detect EOF, so either Stream |
78 | 0 | // or EarlyData state should both be all right. |
79 | 0 | eof: false, |
80 | 0 | } |
81 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::new Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::new Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::new Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::new Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::new |
82 | | |
83 | 0 | pub fn set_eof(mut self, eof: bool) -> Self { |
84 | 0 | self.eof = eof; |
85 | 0 | self |
86 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::set_eof Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::set_eof Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::set_eof Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::set_eof Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::set_eof |
87 | | |
88 | 0 | pub fn as_mut_pin(&mut self) -> Pin<&mut Self> { |
89 | 0 | Pin::new(self) |
90 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::as_mut_pin Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::as_mut_pin |
91 | | |
92 | 0 | pub fn read_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> { |
93 | 0 | let mut reader = SyncReadAdapter { io: self.io, cx }; |
94 | | |
95 | 0 | let n = match self.session.read_tls(&mut reader) { |
96 | 0 | Ok(n) => n, |
97 | 0 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Poll::Pending, |
98 | 0 | Err(err) => return Poll::Ready(Err(err)), |
99 | | }; |
100 | | |
101 | 0 | self.session.process_new_packets().map_err(|err| { |
102 | | // In case we have an alert to send describing this error, |
103 | | // try a last-gasp write -- but don't predate the primary |
104 | | // error. |
105 | 0 | let _ = self.write_io(cx); |
106 | | |
107 | 0 | io::Error::new(io::ErrorKind::InvalidData, err) |
108 | 0 | })?; Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::read_io::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::read_io::{closure#0} |
109 | | |
110 | 0 | Poll::Ready(Ok(n)) |
111 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::read_io Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::read_io Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::read_io Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::read_io Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::read_io |
112 | | |
113 | 0 | pub fn write_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> { |
114 | 0 | let mut writer = SyncWriteAdapter { io: self.io, cx }; |
115 | | |
116 | 0 | match self.session.write_tls(&mut writer) { |
117 | 0 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending, |
118 | 0 | result => Poll::Ready(result), |
119 | | } |
120 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::write_io Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::write_io Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::write_io Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::write_io Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::write_io |
121 | | |
122 | 0 | pub fn handshake(&mut self, cx: &mut Context) -> Poll<io::Result<(usize, usize)>> { |
123 | 0 | let mut wrlen = 0; |
124 | 0 | let mut rdlen = 0; |
125 | | |
126 | | loop { |
127 | 0 | let mut write_would_block = false; |
128 | 0 | let mut read_would_block = false; |
129 | 0 | let mut need_flush = false; |
130 | | |
131 | 0 | while self.session.wants_write() { |
132 | 0 | match self.write_io(cx) { |
133 | 0 | Poll::Ready(Ok(0)) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())), |
134 | 0 | Poll::Ready(Ok(n)) => { |
135 | 0 | wrlen += n; |
136 | 0 | need_flush = true; |
137 | 0 | } |
138 | | Poll::Pending => { |
139 | 0 | write_would_block = true; |
140 | 0 | break; |
141 | | } |
142 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
143 | | } |
144 | | } |
145 | | |
146 | 0 | if need_flush { |
147 | 0 | match Pin::new(&mut self.io).poll_flush(cx) { |
148 | 0 | Poll::Ready(Ok(())) => (), |
149 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
150 | 0 | Poll::Pending => write_would_block = true, |
151 | | } |
152 | 0 | } |
153 | | |
154 | 0 | while !self.eof && self.session.wants_read() { |
155 | 0 | match self.read_io(cx) { |
156 | 0 | Poll::Ready(Ok(0)) => self.eof = true, |
157 | 0 | Poll::Ready(Ok(n)) => rdlen += n, |
158 | | Poll::Pending => { |
159 | 0 | read_would_block = true; |
160 | 0 | break; |
161 | | } |
162 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
163 | | } |
164 | | } |
165 | | |
166 | 0 | return match (self.eof, self.session.is_handshaking()) { |
167 | | (true, true) => { |
168 | 0 | let err = io::Error::new(io::ErrorKind::UnexpectedEof, "tls handshake eof"); |
169 | 0 | Poll::Ready(Err(err)) |
170 | | } |
171 | 0 | (_, false) => Poll::Ready(Ok((rdlen, wrlen))), |
172 | 0 | (_, true) if write_would_block || read_would_block => { |
173 | 0 | if rdlen != 0 || wrlen != 0 { |
174 | 0 | Poll::Ready(Ok((rdlen, wrlen))) |
175 | | } else { |
176 | 0 | Poll::Pending |
177 | | } |
178 | | } |
179 | 0 | (..) => continue, |
180 | | }; |
181 | | } |
182 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::handshake Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::handshake Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::handshake Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::handshake Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::handshake |
183 | | } |
184 | | |
185 | | impl<IO: AsyncRead + AsyncWrite + Unpin, C, SD> AsyncRead for Stream<'_, IO, C> |
186 | | where |
187 | | C: DerefMut + Deref<Target = ConnectionCommon<SD>>, |
188 | | SD: SideData, |
189 | | { |
190 | 0 | fn poll_read( |
191 | 0 | mut self: Pin<&mut Self>, |
192 | 0 | cx: &mut Context<'_>, |
193 | 0 | buf: &mut ReadBuf<'_>, |
194 | 0 | ) -> Poll<io::Result<()>> { |
195 | 0 | let mut io_pending = false; |
196 | | |
197 | | // read a packet |
198 | 0 | while !self.eof && self.session.wants_read() { |
199 | 0 | match self.read_io(cx) { |
200 | | Poll::Ready(Ok(0)) => { |
201 | 0 | break; |
202 | | } |
203 | 0 | Poll::Ready(Ok(_)) => (), |
204 | | Poll::Pending => { |
205 | 0 | io_pending = true; |
206 | 0 | break; |
207 | | } |
208 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
209 | | } |
210 | | } |
211 | | |
212 | 0 | match self.session.reader().read(buf.initialize_unfilled()) { |
213 | | // If Rustls returns `Ok(0)` (while `buf` is non-empty), the peer closed the |
214 | | // connection with a `CloseNotify` message and no more data will be forthcoming. |
215 | | // |
216 | | // Rustls yielded more data: advance the buffer, then see if more data is coming. |
217 | | // |
218 | | // We don't need to modify `self.eof` here, because it is only a temporary mark. |
219 | | // rustls will only return 0 if is has received `CloseNotify`, |
220 | | // in which case no additional processing is required. |
221 | 0 | Ok(n) => { |
222 | 0 | buf.advance(n); |
223 | 0 | Poll::Ready(Ok(())) |
224 | | } |
225 | | |
226 | | // Rustls doesn't have more data to yield, but it believes the connection is open. |
227 | 0 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
228 | 0 | if !io_pending { |
229 | 0 | // If `wants_read()` is satisfied, rustls will not return `WouldBlock`. |
230 | 0 | // but if it does, we can try again. |
231 | 0 | // |
232 | 0 | // If the rustls state is abnormal, it may cause a cyclic wakeup. |
233 | 0 | // but tokio's cooperative budget will prevent infinite wakeup. |
234 | 0 | cx.waker().wake_by_ref(); |
235 | 0 | } |
236 | | |
237 | 0 | Poll::Pending |
238 | | } |
239 | | |
240 | 0 | Err(err) => Poll::Ready(Err(err)), |
241 | | } |
242 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_read::AsyncRead>::poll_read |
243 | | } |
244 | | |
245 | | impl<IO: AsyncRead + AsyncWrite + Unpin, C, SD> AsyncWrite for Stream<'_, IO, C> |
246 | | where |
247 | | C: DerefMut + Deref<Target = ConnectionCommon<SD>>, |
248 | | SD: SideData, |
249 | | { |
250 | 0 | fn poll_write( |
251 | 0 | mut self: Pin<&mut Self>, |
252 | 0 | cx: &mut Context, |
253 | 0 | buf: &[u8], |
254 | 0 | ) -> Poll<io::Result<usize>> { |
255 | 0 | let mut pos = 0; |
256 | | |
257 | 0 | while pos != buf.len() { |
258 | 0 | let mut would_block = false; |
259 | | |
260 | 0 | match self.session.writer().write(&buf[pos..]) { |
261 | 0 | Ok(n) => pos += n, |
262 | 0 | Err(err) => return Poll::Ready(Err(err)), |
263 | | }; |
264 | | |
265 | 0 | while self.session.wants_write() { |
266 | 0 | match self.write_io(cx) { |
267 | | Poll::Ready(Ok(0)) | Poll::Pending => { |
268 | 0 | would_block = true; |
269 | 0 | break; |
270 | | } |
271 | 0 | Poll::Ready(Ok(_)) => (), |
272 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
273 | | } |
274 | | } |
275 | | |
276 | 0 | return match (pos, would_block) { |
277 | 0 | (0, true) => Poll::Pending, |
278 | 0 | (n, true) => Poll::Ready(Ok(n)), |
279 | 0 | (_, false) => continue, |
280 | | }; |
281 | | } |
282 | | |
283 | 0 | Poll::Ready(Ok(pos)) |
284 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write |
285 | | |
286 | 0 | fn poll_write_vectored( |
287 | 0 | mut self: Pin<&mut Self>, |
288 | 0 | cx: &mut Context<'_>, |
289 | 0 | bufs: &[IoSlice<'_>], |
290 | 0 | ) -> Poll<io::Result<usize>> { |
291 | 0 | if bufs.iter().all(|buf| buf.is_empty()) {Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0} |
292 | 0 | return Poll::Ready(Ok(0)); |
293 | 0 | } |
294 | | |
295 | | loop { |
296 | 0 | let mut would_block = false; |
297 | 0 | let written = self.session.writer().write_vectored(bufs)?; |
298 | | |
299 | 0 | while self.session.wants_write() { |
300 | 0 | match self.write_io(cx) { |
301 | | Poll::Ready(Ok(0)) | Poll::Pending => { |
302 | 0 | would_block = true; |
303 | 0 | break; |
304 | | } |
305 | 0 | Poll::Ready(Ok(_)) => (), |
306 | 0 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), |
307 | | } |
308 | | } |
309 | | |
310 | 0 | return match (written, would_block) { |
311 | 0 | (0, true) => Poll::Pending, |
312 | 0 | (0, false) => continue, |
313 | 0 | (n, _) => Poll::Ready(Ok(n)), |
314 | | }; |
315 | | } |
316 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write_vectored |
317 | | |
318 | | #[inline] |
319 | 0 | fn is_write_vectored(&self) -> bool { |
320 | 0 | true |
321 | 0 | } |
322 | | |
323 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { |
324 | 0 | self.session.writer().flush()?; |
325 | 0 | while self.session.wants_write() { |
326 | 0 | if ready!(self.write_io(cx))? == 0 { |
327 | 0 | return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
328 | 0 | } |
329 | | } |
330 | 0 | Pin::new(&mut self.io).poll_flush(cx) |
331 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_flush Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_flush |
332 | | |
333 | 0 | fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
334 | 0 | while self.session.wants_write() { |
335 | 0 | if ready!(self.write_io(cx))? == 0 { |
336 | 0 | return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
337 | 0 | } |
338 | | } |
339 | | |
340 | 0 | Poll::Ready(match ready!(Pin::new(&mut self.io).poll_shutdown(cx)) { |
341 | 0 | Ok(()) => Ok(()), |
342 | | // When trying to shutdown, not being connected seems fine |
343 | 0 | Err(err) if err.kind() == io::ErrorKind::NotConnected => Ok(()), |
344 | 0 | Err(err) => Err(err), |
345 | | }) |
346 | 0 | } Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_shutdown |
347 | | } |
348 | | |
349 | | /// An adapter that implements a [`Read`] interface for [`AsyncRead`] types and an |
350 | | /// associated [`Context`]. |
351 | | /// |
352 | | /// Turns `Poll::Pending` into `WouldBlock`. |
353 | | pub struct SyncReadAdapter<'a, 'b, T> { |
354 | | pub io: &'a mut T, |
355 | | pub cx: &'a mut Context<'b>, |
356 | | } |
357 | | |
358 | | impl<T: AsyncRead + Unpin> Read for SyncReadAdapter<'_, '_, T> { |
359 | | #[inline] |
360 | 0 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
361 | 0 | let mut buf = ReadBuf::new(buf); |
362 | 0 | match Pin::new(&mut self.io).poll_read(self.cx, &mut buf) { |
363 | 0 | Poll::Ready(Ok(())) => Ok(buf.filled().len()), |
364 | 0 | Poll::Ready(Err(err)) => Err(err), |
365 | 0 | Poll::Pending => Err(io::ErrorKind::WouldBlock.into()), |
366 | | } |
367 | 0 | } Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Read>::read Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Read>::read Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Read>::read Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<_> as std::io::Read>::read |
368 | | } |
369 | | |
370 | | /// An adapter that implements a [`Write`] interface for [`AsyncWrite`] types and an |
371 | | /// associated [`Context`]. |
372 | | /// |
373 | | /// Turns `Poll::Pending` into `WouldBlock`. |
374 | | pub struct SyncWriteAdapter<'a, 'b, T> { |
375 | | pub io: &'a mut T, |
376 | | pub cx: &'a mut Context<'b>, |
377 | | } |
378 | | |
379 | | impl<T: Unpin> SyncWriteAdapter<'_, '_, T> { |
380 | | #[inline] |
381 | 0 | fn poll_with<U>( |
382 | 0 | &mut self, |
383 | 0 | f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<io::Result<U>>, |
384 | 0 | ) -> io::Result<U> { |
385 | 0 | match f(Pin::new(self.io), self.cx) { |
386 | 0 | Poll::Ready(result) => result, |
387 | 0 | Poll::Pending => Err(io::ErrorKind::WouldBlock.into()), |
388 | | } |
389 | 0 | } Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush::{closure#0}>Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_>>::poll_with::<_, _> |
390 | | } |
391 | | |
392 | | impl<T: AsyncWrite + Unpin> Write for SyncWriteAdapter<'_, '_, T> { |
393 | | #[inline] |
394 | 0 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
395 | 0 | self.poll_with(|io, cx| io.poll_write(cx, buf)) Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write::{closure#0} |
396 | 0 | } Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write |
397 | | |
398 | | #[inline] |
399 | 0 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
400 | 0 | self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs)) Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write_vectored::{closure#0} |
401 | 0 | } Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write_vectored |
402 | | |
403 | 0 | fn flush(&mut self) -> io::Result<()> { |
404 | 0 | self.poll_with(|io, cx| io.poll_flush(cx)) Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush::{closure#0}Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::flush::{closure#0} |
405 | 0 | } Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::flush |
406 | | } |
407 | | |
408 | | #[cfg(test)] |
409 | | mod test_stream; |