Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-rustls-0.26.1/src/common/mod.rs
Line
Count
Source
1
use std::io::{self, IoSlice, Read, Write};
2
use std::ops::{Deref, DerefMut};
3
use std::pin::Pin;
4
use std::task::{Context, Poll};
5
6
use rustls::{ConnectionCommon, SideData};
7
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8
9
mod handshake;
10
pub(crate) use handshake::{IoSession, MidHandshake};
11
12
#[derive(Debug)]
13
pub enum TlsState {
14
    #[cfg(feature = "early-data")]
15
    EarlyData(usize, Vec<u8>),
16
    Stream,
17
    ReadShutdown,
18
    WriteShutdown,
19
    FullyShutdown,
20
}
21
22
impl TlsState {
23
    #[inline]
24
0
    pub fn shutdown_read(&mut self) {
25
0
        match *self {
26
0
            TlsState::WriteShutdown | TlsState::FullyShutdown => *self = TlsState::FullyShutdown,
27
0
            _ => *self = TlsState::ReadShutdown,
28
        }
29
0
    }
Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_read
Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_read
30
31
    #[inline]
32
0
    pub fn shutdown_write(&mut self) {
33
0
        match *self {
34
0
            TlsState::ReadShutdown | TlsState::FullyShutdown => *self = TlsState::FullyShutdown,
35
0
            _ => *self = TlsState::WriteShutdown,
36
        }
37
0
    }
Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_write
Unexecuted instantiation: <tokio_rustls::common::TlsState>::shutdown_write
38
39
    #[inline]
40
0
    pub fn writeable(&self) -> bool {
41
0
        !matches!(*self, TlsState::WriteShutdown | TlsState::FullyShutdown)
42
0
    }
Unexecuted instantiation: <tokio_rustls::common::TlsState>::writeable
Unexecuted instantiation: <tokio_rustls::common::TlsState>::writeable
43
44
    #[inline]
45
0
    pub fn readable(&self) -> bool {
46
0
        !matches!(*self, TlsState::ReadShutdown | TlsState::FullyShutdown)
47
0
    }
Unexecuted instantiation: <tokio_rustls::common::TlsState>::readable
Unexecuted instantiation: <tokio_rustls::common::TlsState>::readable
48
49
    #[inline]
50
    #[cfg(feature = "early-data")]
51
    pub fn is_early_data(&self) -> bool {
52
        matches!(self, TlsState::EarlyData(..))
53
    }
54
55
    #[inline]
56
    #[cfg(not(feature = "early-data"))]
57
0
    pub const fn is_early_data(&self) -> bool {
58
0
        false
59
0
    }
Unexecuted instantiation: <tokio_rustls::common::TlsState>::is_early_data
Unexecuted instantiation: <tokio_rustls::common::TlsState>::is_early_data
60
}
61
62
pub struct Stream<'a, IO, C> {
63
    pub io: &'a mut IO,
64
    pub session: &'a mut C,
65
    pub eof: bool,
66
}
67
68
impl<'a, IO: AsyncRead + AsyncWrite + Unpin, C, SD> Stream<'a, IO, C>
69
where
70
    C: DerefMut + Deref<Target = ConnectionCommon<SD>>,
71
    SD: SideData,
72
{
73
0
    pub fn new(io: &'a mut IO, session: &'a mut C) -> Self {
74
0
        Stream {
75
0
            io,
76
0
            session,
77
0
            // The state so far is only used to detect EOF, so either Stream
78
0
            // or EarlyData state should both be all right.
79
0
            eof: false,
80
0
        }
81
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::new
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::new
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::new
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::new
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::new
82
83
0
    pub fn set_eof(mut self, eof: bool) -> Self {
84
0
        self.eof = eof;
85
0
        self
86
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::set_eof
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::set_eof
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::set_eof
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::set_eof
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::set_eof
87
88
0
    pub fn as_mut_pin(&mut self) -> Pin<&mut Self> {
89
0
        Pin::new(self)
90
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::as_mut_pin
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::as_mut_pin
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::as_mut_pin
91
92
0
    pub fn read_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> {
93
0
        let mut reader = SyncReadAdapter { io: self.io, cx };
94
95
0
        let n = match self.session.read_tls(&mut reader) {
96
0
            Ok(n) => n,
97
0
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Poll::Pending,
98
0
            Err(err) => return Poll::Ready(Err(err)),
99
        };
100
101
0
        self.session.process_new_packets().map_err(|err| {
102
            // In case we have an alert to send describing this error,
103
            // try a last-gasp write -- but don't predate the primary
104
            // error.
105
0
            let _ = self.write_io(cx);
106
107
0
            io::Error::new(io::ErrorKind::InvalidData, err)
108
0
        })?;
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::read_io::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::read_io::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::read_io::{closure#0}
109
110
0
        Poll::Ready(Ok(n))
111
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::read_io
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::read_io
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::read_io
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::read_io
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::read_io
112
113
0
    pub fn write_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> {
114
0
        let mut writer = SyncWriteAdapter { io: self.io, cx };
115
116
0
        match self.session.write_tls(&mut writer) {
117
0
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
118
0
            result => Poll::Ready(result),
119
        }
120
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::write_io
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::write_io
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::write_io
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::write_io
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::write_io
121
122
0
    pub fn handshake(&mut self, cx: &mut Context) -> Poll<io::Result<(usize, usize)>> {
123
0
        let mut wrlen = 0;
124
0
        let mut rdlen = 0;
125
126
        loop {
127
0
            let mut write_would_block = false;
128
0
            let mut read_would_block = false;
129
0
            let mut need_flush = false;
130
131
0
            while self.session.wants_write() {
132
0
                match self.write_io(cx) {
133
0
                    Poll::Ready(Ok(0)) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
134
0
                    Poll::Ready(Ok(n)) => {
135
0
                        wrlen += n;
136
0
                        need_flush = true;
137
0
                    }
138
                    Poll::Pending => {
139
0
                        write_would_block = true;
140
0
                        break;
141
                    }
142
0
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
143
                }
144
            }
145
146
0
            if need_flush {
147
0
                match Pin::new(&mut self.io).poll_flush(cx) {
148
0
                    Poll::Ready(Ok(())) => (),
149
0
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
150
0
                    Poll::Pending => write_would_block = true,
151
                }
152
0
            }
153
154
0
            while !self.eof && self.session.wants_read() {
155
0
                match self.read_io(cx) {
156
0
                    Poll::Ready(Ok(0)) => self.eof = true,
157
0
                    Poll::Ready(Ok(n)) => rdlen += n,
158
                    Poll::Pending => {
159
0
                        read_would_block = true;
160
0
                        break;
161
                    }
162
0
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
163
                }
164
            }
165
166
0
            return match (self.eof, self.session.is_handshaking()) {
167
                (true, true) => {
168
0
                    let err = io::Error::new(io::ErrorKind::UnexpectedEof, "tls handshake eof");
169
0
                    Poll::Ready(Err(err))
170
                }
171
0
                (_, false) => Poll::Ready(Ok((rdlen, wrlen))),
172
0
                (_, true) if write_would_block || read_would_block => {
173
0
                    if rdlen != 0 || wrlen != 0 {
174
0
                        Poll::Ready(Ok((rdlen, wrlen)))
175
                    } else {
176
0
                        Poll::Pending
177
                    }
178
                }
179
0
                (..) => continue,
180
            };
181
        }
182
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection>>::handshake
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection>>::handshake
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection>>::handshake
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection>>::handshake
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _>>::handshake
183
}
184
185
impl<IO: AsyncRead + AsyncWrite + Unpin, C, SD> AsyncRead for Stream<'_, IO, C>
186
where
187
    C: DerefMut + Deref<Target = ConnectionCommon<SD>>,
188
    SD: SideData,
189
{
190
0
    fn poll_read(
191
0
        mut self: Pin<&mut Self>,
192
0
        cx: &mut Context<'_>,
193
0
        buf: &mut ReadBuf<'_>,
194
0
    ) -> Poll<io::Result<()>> {
195
0
        let mut io_pending = false;
196
197
        // read a packet
198
0
        while !self.eof && self.session.wants_read() {
199
0
            match self.read_io(cx) {
200
                Poll::Ready(Ok(0)) => {
201
0
                    break;
202
                }
203
0
                Poll::Ready(Ok(_)) => (),
204
                Poll::Pending => {
205
0
                    io_pending = true;
206
0
                    break;
207
                }
208
0
                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
209
            }
210
        }
211
212
0
        match self.session.reader().read(buf.initialize_unfilled()) {
213
            // If Rustls returns `Ok(0)` (while `buf` is non-empty), the peer closed the
214
            // connection with a `CloseNotify` message and no more data will be forthcoming.
215
            //
216
            // Rustls yielded more data: advance the buffer, then see if more data is coming.
217
            //
218
            // We don't need to modify `self.eof` here, because it is only a temporary mark.
219
            // rustls will only return 0 if is has received `CloseNotify`,
220
            // in which case no additional processing is required.
221
0
            Ok(n) => {
222
0
                buf.advance(n);
223
0
                Poll::Ready(Ok(()))
224
            }
225
226
            // Rustls doesn't have more data to yield, but it believes the connection is open.
227
0
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
228
0
                if !io_pending {
229
0
                    // If `wants_read()` is satisfied, rustls will not return `WouldBlock`.
230
0
                    // but if it does, we can try again.
231
0
                    //
232
0
                    // If the rustls state is abnormal, it may cause a cyclic wakeup.
233
0
                    // but tokio's cooperative budget will prevent infinite wakeup.
234
0
                    cx.waker().wake_by_ref();
235
0
                }
236
237
0
                Poll::Pending
238
            }
239
240
0
            Err(err) => Poll::Ready(Err(err)),
241
        }
242
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_read::AsyncRead>::poll_read
243
}
244
245
impl<IO: AsyncRead + AsyncWrite + Unpin, C, SD> AsyncWrite for Stream<'_, IO, C>
246
where
247
    C: DerefMut + Deref<Target = ConnectionCommon<SD>>,
248
    SD: SideData,
249
{
250
0
    fn poll_write(
251
0
        mut self: Pin<&mut Self>,
252
0
        cx: &mut Context,
253
0
        buf: &[u8],
254
0
    ) -> Poll<io::Result<usize>> {
255
0
        let mut pos = 0;
256
257
0
        while pos != buf.len() {
258
0
            let mut would_block = false;
259
260
0
            match self.session.writer().write(&buf[pos..]) {
261
0
                Ok(n) => pos += n,
262
0
                Err(err) => return Poll::Ready(Err(err)),
263
            };
264
265
0
            while self.session.wants_write() {
266
0
                match self.write_io(cx) {
267
                    Poll::Ready(Ok(0)) | Poll::Pending => {
268
0
                        would_block = true;
269
0
                        break;
270
                    }
271
0
                    Poll::Ready(Ok(_)) => (),
272
0
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
273
                }
274
            }
275
276
0
            return match (pos, would_block) {
277
0
                (0, true) => Poll::Pending,
278
0
                (n, true) => Poll::Ready(Ok(n)),
279
0
                (_, false) => continue,
280
            };
281
        }
282
283
0
        Poll::Ready(Ok(pos))
284
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write
285
286
0
    fn poll_write_vectored(
287
0
        mut self: Pin<&mut Self>,
288
0
        cx: &mut Context<'_>,
289
0
        bufs: &[IoSlice<'_>],
290
0
    ) -> Poll<io::Result<usize>> {
291
0
        if bufs.iter().all(|buf| buf.is_empty()) {
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write_vectored::{closure#0}
292
0
            return Poll::Ready(Ok(0));
293
0
        }
294
295
        loop {
296
0
            let mut would_block = false;
297
0
            let written = self.session.writer().write_vectored(bufs)?;
298
299
0
            while self.session.wants_write() {
300
0
                match self.write_io(cx) {
301
                    Poll::Ready(Ok(0)) | Poll::Pending => {
302
0
                        would_block = true;
303
0
                        break;
304
                    }
305
0
                    Poll::Ready(Ok(_)) => (),
306
0
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
307
                }
308
            }
309
310
0
            return match (written, would_block) {
311
0
                (0, true) => Poll::Pending,
312
0
                (0, false) => continue,
313
0
                (n, _) => Poll::Ready(Ok(n)),
314
            };
315
        }
316
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_write_vectored
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_write_vectored
317
318
    #[inline]
319
0
    fn is_write_vectored(&self) -> bool {
320
0
        true
321
0
    }
322
323
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
324
0
        self.session.writer().flush()?;
325
0
        while self.session.wants_write() {
326
0
            if ready!(self.write_io(cx))? == 0 {
327
0
                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
328
0
            }
329
        }
330
0
        Pin::new(&mut self.io).poll_flush(cx)
331
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_flush
332
333
0
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
334
0
        while self.session.wants_write() {
335
0
            if ready!(self.write_io(cx))? == 0 {
336
0
                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
337
0
            }
338
        }
339
340
0
        Poll::Ready(match ready!(Pin::new(&mut self.io).poll_shutdown(cx)) {
341
0
            Ok(()) => Ok(()),
342
            // When trying to shutdown, not being connected seems fine
343
0
            Err(err) if err.kind() == io::ErrorKind::NotConnected => Ok(()),
344
0
            Err(err) => Err(err),
345
        })
346
0
    }
Unexecuted instantiation: <tokio_rustls::common::Stream<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio_rustls::common::Stream<ztunnel::proxy::h2::TokioH2Stream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::client::client_conn::connection::ClientConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio_rustls::common::Stream<tokio::net::tcp::stream::TcpStream, rustls::server::server_conn::connection::ServerConnection> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio_rustls::common::Stream<_, _> as tokio::io::async_write::AsyncWrite>::poll_shutdown
347
}
348
349
/// An adapter that implements a [`Read`] interface for [`AsyncRead`] types and an
350
/// associated [`Context`].
351
///
352
/// Turns `Poll::Pending` into `WouldBlock`.
353
pub struct SyncReadAdapter<'a, 'b, T> {
354
    pub io: &'a mut T,
355
    pub cx: &'a mut Context<'b>,
356
}
357
358
impl<T: AsyncRead + Unpin> Read for SyncReadAdapter<'_, '_, T> {
359
    #[inline]
360
0
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
361
0
        let mut buf = ReadBuf::new(buf);
362
0
        match Pin::new(&mut self.io).poll_read(self.cx, &mut buf) {
363
0
            Poll::Ready(Ok(())) => Ok(buf.filled().len()),
364
0
            Poll::Ready(Err(err)) => Err(err),
365
0
            Poll::Pending => Err(io::ErrorKind::WouldBlock.into()),
366
        }
367
0
    }
Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Read>::read
Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Read>::read
Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Read>::read
Unexecuted instantiation: <tokio_rustls::common::SyncReadAdapter<_> as std::io::Read>::read
368
}
369
370
/// An adapter that implements a [`Write`] interface for [`AsyncWrite`] types and an
371
/// associated [`Context`].
372
///
373
/// Turns `Poll::Pending` into `WouldBlock`.
374
pub struct SyncWriteAdapter<'a, 'b, T> {
375
    pub io: &'a mut T,
376
    pub cx: &'a mut Context<'b>,
377
}
378
379
impl<T: Unpin> SyncWriteAdapter<'_, '_, T> {
380
    #[inline]
381
0
    fn poll_with<U>(
382
0
        &mut self,
383
0
        f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<io::Result<U>>,
384
0
    ) -> io::Result<U> {
385
0
        match f(Pin::new(self.io), self.cx) {
386
0
            Poll::Ready(result) => result,
387
0
            Poll::Pending => Err(io::ErrorKind::WouldBlock.into()),
388
        }
389
0
    }
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<usize, <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream>>::poll_with::<(), <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush::{closure#0}>
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_>>::poll_with::<_, _>
390
}
391
392
impl<T: AsyncWrite + Unpin> Write for SyncWriteAdapter<'_, '_, T> {
393
    #[inline]
394
0
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
395
0
        self.poll_with(|io, cx| io.poll_write(cx, buf))
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write::{closure#0}
396
0
    }
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write
397
398
    #[inline]
399
0
    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
400
0
        self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write_vectored::{closure#0}
401
0
    }
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::write_vectored
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::write_vectored
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::write_vectored
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::write_vectored
402
403
0
    fn flush(&mut self) -> io::Result<()> {
404
0
        self.poll_with(|io, cx| io.poll_flush(cx))
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush::{closure#0}
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::flush::{closure#0}
405
0
    }
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<hyper_util::rt::tokio::TokioIo<hyper_util::rt::tokio::TokioIo<tokio::net::tcp::stream::TcpStream>>> as std::io::Write>::flush
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<ztunnel::proxy::h2::TokioH2Stream> as std::io::Write>::flush
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<tokio::net::tcp::stream::TcpStream> as std::io::Write>::flush
Unexecuted instantiation: <tokio_rustls::common::SyncWriteAdapter<_> as std::io::Write>::flush
406
}
407
408
#[cfg(test)]
409
mod test_stream;