Coverage Report

Created: 2025-10-31 06:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.6.0/src/rt/io.rs
Line
Count
Source
1
use std::fmt;
2
use std::mem::MaybeUninit;
3
use std::ops::DerefMut;
4
use std::pin::Pin;
5
use std::task::{Context, Poll};
6
7
// New IO traits? What?! Why, are you bonkers?
8
//
9
// I mean, yes, probably. But, here's the goals:
10
//
11
// 1. Supports poll-based IO operations.
12
// 2. Opt-in vectored IO.
13
// 3. Can use an optional buffer pool.
14
// 4. Able to add completion-based (uring) IO eventually.
15
//
16
// Frankly, the last point is the entire reason we're doing this. We want to
17
// have forwards-compatibility with an eventually stable io-uring runtime. We
18
// don't need that to work right away. But it must be possible to add in here
19
// without breaking hyper 1.0.
20
//
21
// While in here, if there's small tweaks to poll_read or poll_write that would
22
// allow even the "slow" path to be faster, such as if someone didn't remember
23
// to forward along an `is_completion` call.
24
25
/// Reads bytes from a source.
26
///
27
/// This trait is similar to `std::io::Read`, but supports asynchronous reads.
28
pub trait Read {
29
    /// Attempts to read bytes into the `buf`.
30
    ///
31
    /// On success, returns `Poll::Ready(Ok(()))` and places data in the
32
    /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is
33
    /// unchanged), it implies that EOF has been reached.
34
    ///
35
    /// If no data is available for reading, the method returns `Poll::Pending`
36
    /// and arranges for the current task (via `cx.waker()`) to receive a
37
    /// notification when the object becomes readable or is closed.
38
    fn poll_read(
39
        self: Pin<&mut Self>,
40
        cx: &mut Context<'_>,
41
        buf: ReadBufCursor<'_>,
42
    ) -> Poll<Result<(), std::io::Error>>;
43
}
44
45
/// Write bytes asynchronously.
46
///
47
/// This trait is similar to `std::io::Write`, but for asynchronous writes.
48
pub trait Write {
49
    /// Attempt to write bytes from `buf` into the destination.
50
    ///
51
    /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If
52
    /// successful, it must be guaranteed that `n <= buf.len()`. A return value
53
    /// of `0` means that the underlying object is no longer able to accept
54
    /// bytes, or that the provided buffer is empty.
55
    ///
56
    /// If the object is not ready for writing, the method returns
57
    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
58
    /// receive a notification when the object becomes writable or is closed.
59
    fn poll_write(
60
        self: Pin<&mut Self>,
61
        cx: &mut Context<'_>,
62
        buf: &[u8],
63
    ) -> Poll<Result<usize, std::io::Error>>;
64
65
    /// Attempts to flush the object.
66
    ///
67
    /// On success, returns `Poll::Ready(Ok(()))`.
68
    ///
69
    /// If flushing cannot immediately complete, this method returns
70
    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
71
    /// receive a notification when the object can make progress.
72
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>;
73
74
    /// Attempts to shut down this writer.
75
    fn poll_shutdown(
76
        self: Pin<&mut Self>,
77
        cx: &mut Context<'_>,
78
    ) -> Poll<Result<(), std::io::Error>>;
79
80
    /// Returns whether this writer has an efficient `poll_write_vectored`
81
    /// implementation.
82
    ///
83
    /// The default implementation returns `false`.
84
0
    fn is_write_vectored(&self) -> bool {
85
0
        false
86
0
    }
87
88
    /// Like `poll_write`, except that it writes from a slice of buffers.
89
0
    fn poll_write_vectored(
90
0
        self: Pin<&mut Self>,
91
0
        cx: &mut Context<'_>,
92
0
        bufs: &[std::io::IoSlice<'_>],
93
0
    ) -> Poll<Result<usize, std::io::Error>> {
94
0
        let buf = bufs
95
0
            .iter()
96
0
            .find(|b| !b.is_empty())
97
0
            .map_or(&[][..], |b| &**b);
98
0
        self.poll_write(cx, buf)
99
0
    }
100
}
101
102
/// A wrapper around a byte buffer that is incrementally filled and initialized.
103
///
104
/// This type is a sort of "double cursor". It tracks three regions in the
105
/// buffer: a region at the beginning of the buffer that has been logically
106
/// filled with data, a region that has been initialized at some point but not
107
/// yet logically filled, and a region at the end that may be uninitialized.
108
/// The filled region is guaranteed to be a subset of the initialized region.
109
///
110
/// In summary, the contents of the buffer can be visualized as:
111
///
112
/// ```not_rust
113
/// [             capacity              ]
114
/// [ filled |         unfilled         ]
115
/// [    initialized    | uninitialized ]
116
/// ```
117
///
118
/// It is undefined behavior to de-initialize any bytes from the uninitialized
119
/// region, since it is merely unknown whether this region is uninitialized or
120
/// not, and if part of it turns out to be initialized, it must stay initialized.
121
pub struct ReadBuf<'a> {
122
    raw: &'a mut [MaybeUninit<u8>],
123
    filled: usize,
124
    init: usize,
125
}
126
127
/// The cursor part of a [`ReadBuf`].
128
///
129
/// This is created by calling `ReadBuf::unfilled()`.
130
#[derive(Debug)]
131
pub struct ReadBufCursor<'a> {
132
    buf: &'a mut ReadBuf<'a>,
133
}
134
135
impl<'data> ReadBuf<'data> {
136
    /// Create a new `ReadBuf` with a slice of initialized bytes.
137
    #[inline]
138
0
    pub fn new(raw: &'data mut [u8]) -> Self {
139
0
        let len = raw.len();
140
0
        Self {
141
0
            // SAFETY: We never de-init the bytes ourselves.
142
0
            raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) },
143
0
            filled: 0,
144
0
            init: len,
145
0
        }
146
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::new
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::new
147
148
    /// Create a new `ReadBuf` with a slice of uninitialized bytes.
149
    #[inline]
150
0
    pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self {
151
0
        Self {
152
0
            raw,
153
0
            filled: 0,
154
0
            init: 0,
155
0
        }
156
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::uninit
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::uninit
157
158
    /// Get a slice of the buffer that has been filled in with bytes.
159
    #[inline]
160
0
    pub fn filled(&self) -> &[u8] {
161
        // SAFETY: We only slice the filled part of the buffer, which is always valid
162
0
        unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) }
163
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::filled
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::filled
164
165
    /// Get a cursor to the unfilled portion of the buffer.
166
    #[inline]
167
0
    pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> {
168
0
        ReadBufCursor {
169
0
            // SAFETY: self.buf is never re-assigned, so its safe to narrow
170
0
            // the lifetime.
171
0
            buf: unsafe {
172
0
                std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>(
173
0
                    self,
174
0
                )
175
0
            },
176
0
        }
177
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::unfilled
Unexecuted instantiation: <hyper::rt::io::ReadBuf>::unfilled
178
179
    #[inline]
180
    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
181
    pub(crate) unsafe fn set_init(&mut self, n: usize) {
182
        self.init = self.init.max(n);
183
    }
184
185
    #[inline]
186
    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
187
    pub(crate) unsafe fn set_filled(&mut self, n: usize) {
188
        self.filled = self.filled.max(n);
189
    }
190
191
    #[inline]
192
    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
193
    pub(crate) fn len(&self) -> usize {
194
        self.filled
195
    }
196
197
    #[inline]
198
    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
199
    pub(crate) fn init_len(&self) -> usize {
200
        self.init
201
    }
202
203
    #[inline]
204
0
    fn remaining(&self) -> usize {
205
0
        self.capacity() - self.filled
206
0
    }
207
208
    #[inline]
209
0
    fn capacity(&self) -> usize {
210
0
        self.raw.len()
211
0
    }
212
}
213
214
impl fmt::Debug for ReadBuf<'_> {
215
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216
0
        f.debug_struct("ReadBuf")
217
0
            .field("filled", &self.filled)
218
0
            .field("init", &self.init)
219
0
            .field("capacity", &self.capacity())
220
0
            .finish()
221
0
    }
222
}
223
224
impl ReadBufCursor<'_> {
225
    /// Access the unfilled part of the buffer.
226
    ///
227
    /// # Safety
228
    ///
229
    /// The caller must not uninitialize any bytes that may have been
230
    /// initialized before.
231
    #[inline]
232
0
    pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] {
233
0
        &mut self.buf.raw[self.buf.filled..]
234
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBufCursor>::as_mut
Unexecuted instantiation: <hyper::rt::io::ReadBufCursor>::as_mut
235
236
    /// Advance the `filled` cursor by `n` bytes.
237
    ///
238
    /// # Safety
239
    ///
240
    /// The caller must take care that `n` more bytes have been initialized.
241
    #[inline]
242
0
    pub unsafe fn advance(&mut self, n: usize) {
243
0
        self.buf.filled = self.buf.filled.checked_add(n).expect("overflow");
244
0
        self.buf.init = self.buf.filled.max(self.buf.init);
245
0
    }
Unexecuted instantiation: <hyper::rt::io::ReadBufCursor>::advance
Unexecuted instantiation: <hyper::rt::io::ReadBufCursor>::advance
246
247
    /// Returns the number of bytes that can be written from the current
248
    /// position until the end of the buffer is reached.
249
    ///
250
    /// This value is equal to the length of the slice returned by `as_mut()``.
251
    #[inline]
252
0
    pub fn remaining(&self) -> usize {
253
0
        self.buf.remaining()
254
0
    }
255
256
    /// Transfer bytes into `self`` from `src` and advance the cursor
257
    /// by the number of bytes written.
258
    ///
259
    /// # Panics
260
    ///
261
    /// `self` must have enough remaining capacity to contain all of `src`.
262
    #[inline]
263
0
    pub fn put_slice(&mut self, src: &[u8]) {
264
0
        assert!(
265
0
            self.buf.remaining() >= src.len(),
266
0
            "src.len() must fit in remaining()"
267
        );
268
269
0
        let amt = src.len();
270
        // Cannot overflow, asserted above
271
0
        let end = self.buf.filled + amt;
272
273
        // Safety: the length is asserted above
274
0
        unsafe {
275
0
            self.buf.raw[self.buf.filled..end]
276
0
                .as_mut_ptr()
277
0
                .cast::<u8>()
278
0
                .copy_from_nonoverlapping(src.as_ptr(), amt);
279
0
        }
280
281
0
        if self.buf.init < end {
282
0
            self.buf.init = end;
283
0
        }
284
0
        self.buf.filled = end;
285
0
    }
286
}
287
288
macro_rules! deref_async_read {
289
    () => {
290
0
        fn poll_read(
291
0
            mut self: Pin<&mut Self>,
292
0
            cx: &mut Context<'_>,
293
0
            buf: ReadBufCursor<'_>,
294
0
        ) -> Poll<std::io::Result<()>> {
295
0
            Pin::new(&mut **self).poll_read(cx, buf)
296
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Read>::poll_read
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Read>::poll_read
Unexecuted instantiation: <&mut _ as hyper::rt::io::Read>::poll_read
297
    };
298
}
299
300
impl<T: ?Sized + Read + Unpin> Read for Box<T> {
301
    deref_async_read!();
302
}
303
304
impl<T: ?Sized + Read + Unpin> Read for &mut T {
305
    deref_async_read!();
306
}
307
308
impl<P> Read for Pin<P>
309
where
310
    P: DerefMut,
311
    P::Target: Read,
312
{
313
0
    fn poll_read(
314
0
        self: Pin<&mut Self>,
315
0
        cx: &mut Context<'_>,
316
0
        buf: ReadBufCursor<'_>,
317
0
    ) -> Poll<std::io::Result<()>> {
318
0
        pin_as_deref_mut(self).poll_read(cx, buf)
319
0
    }
320
}
321
322
macro_rules! deref_async_write {
323
    () => {
324
0
        fn poll_write(
325
0
            mut self: Pin<&mut Self>,
326
0
            cx: &mut Context<'_>,
327
0
            buf: &[u8],
328
0
        ) -> Poll<std::io::Result<usize>> {
329
0
            Pin::new(&mut **self).poll_write(cx, buf)
330
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Write>::poll_write
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Write>::poll_write
Unexecuted instantiation: <&mut _ as hyper::rt::io::Write>::poll_write
331
332
0
        fn poll_write_vectored(
333
0
            mut self: Pin<&mut Self>,
334
0
            cx: &mut Context<'_>,
335
0
            bufs: &[std::io::IoSlice<'_>],
336
0
        ) -> Poll<std::io::Result<usize>> {
337
0
            Pin::new(&mut **self).poll_write_vectored(cx, bufs)
338
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Write>::poll_write_vectored
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Write>::poll_write_vectored
Unexecuted instantiation: <&mut _ as hyper::rt::io::Write>::poll_write_vectored
339
340
0
        fn is_write_vectored(&self) -> bool {
341
0
            (**self).is_write_vectored()
342
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Write>::is_write_vectored
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Write>::is_write_vectored
Unexecuted instantiation: <&mut _ as hyper::rt::io::Write>::is_write_vectored
343
344
0
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
345
0
            Pin::new(&mut **self).poll_flush(cx)
346
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Write>::poll_flush
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Write>::poll_flush
Unexecuted instantiation: <&mut _ as hyper::rt::io::Write>::poll_flush
347
348
0
        fn poll_shutdown(
349
0
            mut self: Pin<&mut Self>,
350
0
            cx: &mut Context<'_>,
351
0
        ) -> Poll<std::io::Result<()>> {
352
0
            Pin::new(&mut **self).poll_shutdown(cx)
353
0
        }
Unexecuted instantiation: <alloc::boxed::Box<dyn reqwest::connect::AsyncConnWithInfo> as hyper::rt::io::Write>::poll_shutdown
Unexecuted instantiation: <alloc::boxed::Box<dyn hyper::upgrade::Io + core::marker::Send> as hyper::rt::io::Write>::poll_shutdown
Unexecuted instantiation: <&mut _ as hyper::rt::io::Write>::poll_shutdown
354
    };
355
}
356
357
impl<T: ?Sized + Write + Unpin> Write for Box<T> {
358
    deref_async_write!();
359
}
360
361
impl<T: ?Sized + Write + Unpin> Write for &mut T {
362
    deref_async_write!();
363
}
364
365
impl<P> Write for Pin<P>
366
where
367
    P: DerefMut,
368
    P::Target: Write,
369
{
370
0
    fn poll_write(
371
0
        self: Pin<&mut Self>,
372
0
        cx: &mut Context<'_>,
373
0
        buf: &[u8],
374
0
    ) -> Poll<std::io::Result<usize>> {
375
0
        pin_as_deref_mut(self).poll_write(cx, buf)
376
0
    }
377
378
0
    fn poll_write_vectored(
379
0
        self: Pin<&mut Self>,
380
0
        cx: &mut Context<'_>,
381
0
        bufs: &[std::io::IoSlice<'_>],
382
0
    ) -> Poll<std::io::Result<usize>> {
383
0
        pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
384
0
    }
385
386
0
    fn is_write_vectored(&self) -> bool {
387
0
        (**self).is_write_vectored()
388
0
    }
389
390
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
391
0
        pin_as_deref_mut(self).poll_flush(cx)
392
0
    }
393
394
0
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
395
0
        pin_as_deref_mut(self).poll_shutdown(cx)
396
0
    }
397
}
398
399
/// Polyfill for Pin::as_deref_mut()
400
/// TODO: use Pin::as_deref_mut() instead once stabilized
401
0
fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> {
402
    // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or
403
    // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail.
404
0
    unsafe { pin.get_unchecked_mut() }.as_mut()
405
0
}