Coverage Report

Created: 2024-12-17 06:15

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.42.0/src/io/blocking.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::io::sys;
2
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
3
4
use std::cmp;
5
use std::future::Future;
6
use std::io;
7
use std::io::prelude::*;
8
use std::pin::Pin;
9
use std::task::{ready, Context, Poll};
10
11
/// `T` should not implement _both_ Read and Write.
12
#[derive(Debug)]
13
pub(crate) struct Blocking<T> {
14
    inner: Option<T>,
15
    state: State<T>,
16
    /// `true` if the lower IO layer needs flushing.
17
    need_flush: bool,
18
}
19
20
#[derive(Debug)]
21
pub(crate) struct Buf {
22
    buf: Vec<u8>,
23
    pos: usize,
24
}
25
26
pub(crate) const DEFAULT_MAX_BUF_SIZE: usize = 2 * 1024 * 1024;
27
28
#[derive(Debug)]
29
enum State<T> {
30
    Idle(Option<Buf>),
31
    Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
32
}
33
34
cfg_io_blocking! {
35
    impl<T> Blocking<T> {
36
        #[cfg_attr(feature = "fs", allow(dead_code))]
37
0
        pub(crate) fn new(inner: T) -> Blocking<T> {
38
0
            Blocking {
39
0
                inner: Some(inner),
40
0
                state: State::Idle(Some(Buf::with_capacity(0))),
41
0
                need_flush: false,
42
0
            }
43
0
        }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr>>::new
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout>>::new
44
    }
45
}
46
47
impl<T> AsyncRead for Blocking<T>
48
where
49
    T: Read + Unpin + Send + 'static,
50
{
51
0
    fn poll_read(
52
0
        mut self: Pin<&mut Self>,
53
0
        cx: &mut Context<'_>,
54
0
        dst: &mut ReadBuf<'_>,
55
0
    ) -> Poll<io::Result<()>> {
56
        loop {
57
0
            match self.state {
58
0
                State::Idle(ref mut buf_cell) => {
59
0
                    let mut buf = buf_cell.take().unwrap();
60
0
61
0
                    if !buf.is_empty() {
62
0
                        buf.copy_to(dst);
63
0
                        *buf_cell = Some(buf);
64
0
                        return Poll::Ready(Ok(()));
65
0
                    }
66
0
67
0
                    buf.ensure_capacity_for(dst, DEFAULT_MAX_BUF_SIZE);
68
0
                    let mut inner = self.inner.take().unwrap();
69
0
70
0
                    self.state = State::Busy(sys::run(move || {
71
0
                        let res = buf.read_from(&mut inner);
72
0
                        (res, buf, inner)
73
0
                    }));
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read::{closure#0}
74
0
                }
75
0
                State::Busy(ref mut rx) => {
76
0
                    let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?;
77
0
                    self.inner = Some(inner);
78
0
79
0
                    match res {
80
                        Ok(_) => {
81
0
                            buf.copy_to(dst);
82
0
                            self.state = State::Idle(Some(buf));
83
0
                            return Poll::Ready(Ok(()));
84
                        }
85
0
                        Err(e) => {
86
0
                            assert!(buf.is_empty());
87
88
0
                            self.state = State::Idle(Some(buf));
89
0
                            return Poll::Ready(Err(e));
90
                        }
91
                    }
92
                }
93
            }
94
        }
95
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdin> as tokio::io::async_read::AsyncRead>::poll_read
96
}
97
98
impl<T> AsyncWrite for Blocking<T>
99
where
100
    T: Write + Unpin + Send + 'static,
101
{
102
0
    fn poll_write(
103
0
        mut self: Pin<&mut Self>,
104
0
        cx: &mut Context<'_>,
105
0
        src: &[u8],
106
0
    ) -> Poll<io::Result<usize>> {
107
        loop {
108
0
            match self.state {
109
0
                State::Idle(ref mut buf_cell) => {
110
0
                    let mut buf = buf_cell.take().unwrap();
111
0
112
0
                    assert!(buf.is_empty());
113
114
0
                    let n = buf.copy_from(src, DEFAULT_MAX_BUF_SIZE);
115
0
                    let mut inner = self.inner.take().unwrap();
116
0
117
0
                    self.state = State::Busy(sys::run(move || {
118
0
                        let n = buf.len();
119
0
                        let res = buf.write_to(&mut inner).map(|()| n);
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}::{closure#0}
120
0
121
0
                        (res, buf, inner)
122
0
                    }));
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write::{closure#0}
123
0
                    self.need_flush = true;
124
0
125
0
                    return Poll::Ready(Ok(n));
126
                }
127
0
                State::Busy(ref mut rx) => {
128
0
                    let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
129
0
                    self.state = State::Idle(Some(buf));
130
0
                    self.inner = Some(inner);
131
0
132
0
                    // If error, return
133
0
                    res?;
134
                }
135
            }
136
        }
137
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_write
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_write
138
139
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
140
        loop {
141
0
            let need_flush = self.need_flush;
142
0
            match self.state {
143
                // The buffer is not used here
144
0
                State::Idle(ref mut buf_cell) => {
145
0
                    if need_flush {
146
0
                        let buf = buf_cell.take().unwrap();
147
0
                        let mut inner = self.inner.take().unwrap();
148
0
149
0
                        self.state = State::Busy(sys::run(move || {
150
0
                            let res = inner.flush().map(|()| 0);
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}::{closure#0}
151
0
                            (res, buf, inner)
152
0
                        }));
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush::{closure#0}
153
0
154
0
                        self.need_flush = false;
155
0
                    } else {
156
0
                        return Poll::Ready(Ok(()));
157
                    }
158
                }
159
0
                State::Busy(ref mut rx) => {
160
0
                    let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
161
0
                    self.state = State::Idle(Some(buf));
162
0
                    self.inner = Some(inner);
163
0
164
0
                    // If error, return
165
0
                    res?;
166
                }
167
            }
168
        }
169
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_flush
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_flush
170
171
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
172
0
        Poll::Ready(Ok(()))
173
0
    }
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stderr> as tokio::io::async_write::AsyncWrite>::poll_shutdown
Unexecuted instantiation: <tokio::io::blocking::Blocking<std::io::stdio::Stdout> as tokio::io::async_write::AsyncWrite>::poll_shutdown
174
}
175
176
/// Repeats operations that are interrupted.
177
macro_rules! uninterruptibly {
178
    ($e:expr) => {{
179
        loop {
180
            match $e {
181
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
182
                res => break res,
183
            }
184
        }
185
    }};
186
}
187
188
impl Buf {
189
0
    pub(crate) fn with_capacity(n: usize) -> Buf {
190
0
        Buf {
191
0
            buf: Vec::with_capacity(n),
192
0
            pos: 0,
193
0
        }
194
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::with_capacity
Unexecuted instantiation: <tokio::io::blocking::Buf>::with_capacity
Unexecuted instantiation: <tokio::io::blocking::Buf>::with_capacity
195
196
0
    pub(crate) fn is_empty(&self) -> bool {
197
0
        self.len() == 0
198
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::is_empty
Unexecuted instantiation: <tokio::io::blocking::Buf>::is_empty
Unexecuted instantiation: <tokio::io::blocking::Buf>::is_empty
199
200
0
    pub(crate) fn len(&self) -> usize {
201
0
        self.buf.len() - self.pos
202
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::len
Unexecuted instantiation: <tokio::io::blocking::Buf>::len
Unexecuted instantiation: <tokio::io::blocking::Buf>::len
203
204
0
    pub(crate) fn copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize {
205
0
        let n = cmp::min(self.len(), dst.remaining());
206
0
        dst.put_slice(&self.bytes()[..n]);
207
0
        self.pos += n;
208
0
209
0
        if self.pos == self.buf.len() {
210
0
            self.buf.truncate(0);
211
0
            self.pos = 0;
212
0
        }
213
214
0
        n
215
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_to
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_to
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_to
216
217
0
    pub(crate) fn copy_from(&mut self, src: &[u8], max_buf_size: usize) -> usize {
218
0
        assert!(self.is_empty());
219
220
0
        let n = cmp::min(src.len(), max_buf_size);
221
0
222
0
        self.buf.extend_from_slice(&src[..n]);
223
0
        n
224
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from
225
226
0
    pub(crate) fn bytes(&self) -> &[u8] {
227
0
        &self.buf[self.pos..]
228
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::bytes
Unexecuted instantiation: <tokio::io::blocking::Buf>::bytes
Unexecuted instantiation: <tokio::io::blocking::Buf>::bytes
229
230
0
    pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>, max_buf_size: usize) {
231
0
        assert!(self.is_empty());
232
233
0
        let len = cmp::min(bytes.remaining(), max_buf_size);
234
0
235
0
        if self.buf.len() < len {
236
0
            self.buf.reserve(len - self.buf.len());
237
0
        }
238
239
0
        unsafe {
240
0
            self.buf.set_len(len);
241
0
        }
242
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::ensure_capacity_for
Unexecuted instantiation: <tokio::io::blocking::Buf>::ensure_capacity_for
Unexecuted instantiation: <tokio::io::blocking::Buf>::ensure_capacity_for
243
244
0
    pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
245
0
        let res = uninterruptibly!(rd.read(&mut self.buf));
246
247
0
        if let Ok(n) = res {
248
0
            self.buf.truncate(n);
249
0
        } else {
250
0
            self.buf.clear();
251
0
        }
252
253
0
        assert_eq!(self.pos, 0);
254
255
0
        res
256
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<std::io::stdio::Stdin>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<&std::fs::File>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<std::io::stdio::Stdin>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<&std::fs::File>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<std::io::stdio::Stdin>
Unexecuted instantiation: <tokio::io::blocking::Buf>::read_from::<&std::fs::File>
257
258
0
    pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
259
0
        assert_eq!(self.pos, 0);
260
261
        // `write_all` already ignores interrupts
262
0
        let res = wr.write_all(&self.buf);
263
0
        self.buf.clear();
264
0
        res
265
0
    }
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stderr>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stdout>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<&std::fs::File>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stderr>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stdout>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<&std::fs::File>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stderr>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<std::io::stdio::Stdout>
Unexecuted instantiation: <tokio::io::blocking::Buf>::write_to::<&std::fs::File>
266
}
267
268
cfg_fs! {
269
    impl Buf {
270
0
        pub(crate) fn discard_read(&mut self) -> i64 {
271
0
            let ret = -(self.bytes().len() as i64);
272
0
            self.pos = 0;
273
0
            self.buf.truncate(0);
274
0
            ret
275
0
        }
Unexecuted instantiation: <tokio::io::blocking::Buf>::discard_read
Unexecuted instantiation: <tokio::io::blocking::Buf>::discard_read
Unexecuted instantiation: <tokio::io::blocking::Buf>::discard_read
276
277
0
        pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>], max_buf_size: usize) -> usize {
278
0
            assert!(self.is_empty());
279
280
0
            let mut rem = max_buf_size;
281
0
            for buf in bufs {
282
0
                if rem == 0 {
283
0
                    break
284
0
                }
285
0
286
0
                let len = buf.len().min(rem);
287
0
                self.buf.extend_from_slice(&buf[..len]);
288
0
                rem -= len;
289
            }
290
291
0
            max_buf_size - rem
292
0
        }
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from_bufs
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from_bufs
Unexecuted instantiation: <tokio::io::blocking::Buf>::copy_from_bufs
293
    }
294
}