Coverage Report

Created: 2025-02-25 06:39

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/io/blocking.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::io::sys;
2
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
3
4
use std::cmp;
5
use std::future::Future;
6
use std::io;
7
use std::io::prelude::*;
8
use std::mem::MaybeUninit;
9
use std::pin::Pin;
10
use std::task::{ready, Context, Poll};
11
12
/// `T` should not implement _both_ Read and Write.
13
#[derive(Debug)]
14
pub(crate) struct Blocking<T> {
15
    inner: Option<T>,
16
    state: State<T>,
17
    /// `true` if the lower IO layer needs flushing.
18
    need_flush: bool,
19
}
20
21
#[derive(Debug)]
22
pub(crate) struct Buf {
23
    buf: Vec<u8>,
24
    pos: usize,
25
}
26
27
pub(crate) const DEFAULT_MAX_BUF_SIZE: usize = 2 * 1024 * 1024;
28
29
#[derive(Debug)]
30
enum State<T> {
31
    Idle(Option<Buf>),
32
    Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
33
}
34
35
cfg_io_blocking! {
36
    impl<T> Blocking<T> {
37
        /// # Safety
38
        ///
39
        /// The `Read` implementation of `inner` must never read from the buffer
40
        /// it is borrowing and must correctly report the length of the data
41
        /// written into the buffer.
42
        #[cfg_attr(feature = "fs", allow(dead_code))]
43
0
        pub(crate) unsafe fn new(inner: T) -> Blocking<T> {
44
0
            Blocking {
45
0
                inner: Some(inner),
46
0
                state: State::Idle(Some(Buf::with_capacity(0))),
47
0
                need_flush: false,
48
0
            }
49
0
        }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout>>::new
50
    }
51
}
52
53
impl<T> AsyncRead for Blocking<T>
54
where
55
    T: Read + Unpin + Send + 'static,
56
{
57
0
    fn poll_read(
58
0
        mut self: Pin<&mut Self>,
59
0
        cx: &mut Context<'_>,
60
0
        dst: &mut ReadBuf<'_>,
61
0
    ) -> Poll<io::Result<()>> {
62
        loop {
63
0
            match self.state {
64
0
                State::Idle(ref mut buf_cell) => {
65
0
                    let mut buf = buf_cell.take().unwrap();
66
0
67
0
                    if !buf.is_empty() {
68
0
                        buf.copy_to(dst);
69
0
                        *buf_cell = Some(buf);
70
0
                        return Poll::Ready(Ok(()));
71
0
                    }
72
0
73
0
                    let mut inner = self.inner.take().unwrap();
74
0
75
0
                    let max_buf_size = cmp::min(dst.remaining(), DEFAULT_MAX_BUF_SIZE);
76
0
                    self.state = State::Busy(sys::run(move || {
77
0
                        // SAFETY: the requirements are satisfied by `Blocking::new`.
78
0
                        let res = unsafe { buf.read_from(&mut inner, max_buf_size) };
79
0
                        (res, buf, inner)
80
0
                    }));
81
0
                }
82
0
                State::Busy(ref mut rx) => {
83
0
                    let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?;
84
0
                    self.inner = Some(inner);
85
0
86
0
                    match res {
87
                        Ok(_) => {
88
0
                            buf.copy_to(dst);
89
0
                            self.state = State::Idle(Some(buf));
90
0
                            return Poll::Ready(Ok(()));
91
                        }
92
0
                        Err(e) => {
93
0
                            assert!(buf.is_empty());
94
95
0
                            self.state = State::Idle(Some(buf));
96
0
                            return Poll::Ready(Err(e));
97
                        }
98
                    }
99
                }
100
            }
101
        }
102
0
    }
103
}
104
105
impl<T> AsyncWrite for Blocking<T>
106
where
107
    T: Write + Unpin + Send + 'static,
108
{
109
0
    fn poll_write(
110
0
        mut self: Pin<&mut Self>,
111
0
        cx: &mut Context<'_>,
112
0
        src: &[u8],
113
0
    ) -> Poll<io::Result<usize>> {
114
        loop {
115
0
            match self.state {
116
0
                State::Idle(ref mut buf_cell) => {
117
0
                    let mut buf = buf_cell.take().unwrap();
118
0
119
0
                    assert!(buf.is_empty());
120
121
0
                    let n = buf.copy_from(src, DEFAULT_MAX_BUF_SIZE);
122
0
                    let mut inner = self.inner.take().unwrap();
123
0
124
0
                    self.state = State::Busy(sys::run(move || {
125
0
                        let n = buf.len();
126
0
                        let res = buf.write_to(&mut inner).map(|()| n);
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
127
0
128
0
                        (res, buf, inner)
129
0
                    }));
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
130
0
                    self.need_flush = true;
131
0
132
0
                    return Poll::Ready(Ok(n));
133
                }
134
0
                State::Busy(ref mut rx) => {
135
0
                    let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
136
0
                    self.state = State::Idle(Some(buf));
137
0
                    self.inner = Some(inner);
138
0
139
0
                    // If error, return
140
0
                    res?;
141
                }
142
            }
143
        }
144
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write
145
146
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
147
        loop {
148
0
            let need_flush = self.need_flush;
149
0
            match self.state {
150
                // The buffer is not used here
151
0
                State::Idle(ref mut buf_cell) => {
152
0
                    if need_flush {
153
0
                        let buf = buf_cell.take().unwrap();
154
0
                        let mut inner = self.inner.take().unwrap();
155
0
156
0
                        self.state = State::Busy(sys::run(move || {
157
0
                            let res = inner.flush().map(|()| 0);
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
158
0
                            (res, buf, inner)
159
0
                        }));
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
160
0
161
0
                        self.need_flush = false;
162
0
                    } else {
163
0
                        return Poll::Ready(Ok(()));
164
                    }
165
                }
166
0
                State::Busy(ref mut rx) => {
167
0
                    let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
168
0
                    self.state = State::Idle(Some(buf));
169
0
                    self.inner = Some(inner);
170
0
171
0
                    // If error, return
172
0
                    res?;
173
                }
174
            }
175
        }
176
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush
177
178
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
179
0
        Poll::Ready(Ok(()))
180
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_shutdown
181
}
182
183
/// Repeats operations that are interrupted.
184
macro_rules! uninterruptibly {
185
    ($e:expr) => {{
186
        loop {
187
            match $e {
188
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
189
                res => break res,
190
            }
191
        }
192
    }};
193
}
194
195
impl Buf {
196
0
    pub(crate) fn with_capacity(n: usize) -> Buf {
197
0
        Buf {
198
0
            buf: Vec::with_capacity(n),
199
0
            pos: 0,
200
0
        }
201
0
    }
202
203
0
    pub(crate) fn is_empty(&self) -> bool {
204
0
        self.len() == 0
205
0
    }
206
207
0
    pub(crate) fn len(&self) -> usize {
208
0
        self.buf.len() - self.pos
209
0
    }
210
211
0
    pub(crate) fn copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize {
212
0
        let n = cmp::min(self.len(), dst.remaining());
213
0
        dst.put_slice(&self.bytes()[..n]);
214
0
        self.pos += n;
215
0
216
0
        if self.pos == self.buf.len() {
217
0
            self.buf.truncate(0);
218
0
            self.pos = 0;
219
0
        }
220
221
0
        n
222
0
    }
223
224
0
    pub(crate) fn copy_from(&mut self, src: &[u8], max_buf_size: usize) -> usize {
225
0
        assert!(self.is_empty());
226
227
0
        let n = cmp::min(src.len(), max_buf_size);
228
0
229
0
        self.buf.extend_from_slice(&src[..n]);
230
0
        n
231
0
    }
232
233
0
    pub(crate) fn bytes(&self) -> &[u8] {
234
0
        &self.buf[self.pos..]
235
0
    }
236
237
    /// # Safety
238
    ///
239
    /// `rd` must not read from the buffer `read` is borrowing and must correctly
240
    /// report the length of the data written into the buffer.
241
0
    pub(crate) unsafe fn read_from<T: Read>(
242
0
        &mut self,
243
0
        rd: &mut T,
244
0
        max_buf_size: usize,
245
0
    ) -> io::Result<usize> {
246
0
        assert!(self.is_empty());
247
0
        self.buf.reserve(max_buf_size);
248
0
249
0
        let buf = &mut self.buf.spare_capacity_mut()[..max_buf_size];
250
0
        // SAFETY: The memory may be uninitialized, but `rd.read` will only write to the buffer.
251
0
        let buf = unsafe { &mut *(buf as *mut [MaybeUninit<u8>] as *mut [u8]) };
252
0
        let res = uninterruptibly!(rd.read(buf));
253
254
0
        if let Ok(n) = res {
255
            // SAFETY: the caller promises that `rd.read` initializes
256
            // a section of `buf` and correctly reports that length.
257
            // The `self.is_empty()` assertion verifies that `n`
258
            // equals the length of the `buf` capacity that was written
259
            // to (and that `buf` isn't being shrunk).
260
0
            unsafe { self.buf.set_len(n) }
261
0
        } else {
262
0
            self.buf.clear();
263
0
        }
264
265
0
        assert_eq!(self.pos, 0);
266
267
0
        res
268
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<std::io::stdio::Stdin>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<&std::fs::File>
269
270
0
    pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
271
0
        assert_eq!(self.pos, 0);
272
273
        // `write_all` already ignores interrupts
274
0
        let res = wr.write_all(&self.buf);
275
0
        self.buf.clear();
276
0
        res
277
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stderr>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stdout>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<&std::fs::File>
278
}
279
280
cfg_fs! {
281
    impl Buf {
282
0
        pub(crate) fn discard_read(&mut self) -> i64 {
283
0
            let ret = -(self.bytes().len() as i64);
284
0
            self.pos = 0;
285
0
            self.buf.truncate(0);
286
0
            ret
287
0
        }
288
289
0
        pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>], max_buf_size: usize) -> usize {
290
0
            assert!(self.is_empty());
291
292
0
            let mut rem = max_buf_size;
293
0
            for buf in bufs {
294
0
                if rem == 0 {
295
0
                    break
296
0
                }
297
0
298
0
                let len = buf.len().min(rem);
299
0
                self.buf.extend_from_slice(&buf[..len]);
300
0
                rem -= len;
301
            }
302
303
0
            max_buf_size - rem
304
0
        }
305
    }
306
}