Coverage Report

Created: 2025-07-18 06:42

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-test-0.4.4/src/io.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg(not(loom))]
2
3
//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
4
//!
5
//!
6
//! # Overview
7
//!
8
//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
9
//! to handle an arbitrary sequence of read and write operations. This is useful
10
//! for writing unit tests for networking services as using an actual network
11
//! type is fairly non deterministic.
12
//!
13
//! # Usage
14
//!
15
//! Attempting to write data that the mock isn't expecting will result in a
16
//! panic.
17
//!
18
//! [`AsyncRead`]: tokio::io::AsyncRead
19
//! [`AsyncWrite`]: tokio::io::AsyncWrite
20
21
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22
use tokio::sync::mpsc;
23
use tokio::time::{self, Duration, Instant, Sleep};
24
use tokio_stream::wrappers::UnboundedReceiverStream;
25
26
use futures_core::{ready, Stream};
27
use std::collections::VecDeque;
28
use std::fmt;
29
use std::future::Future;
30
use std::pin::Pin;
31
use std::sync::Arc;
32
use std::task::{self, Poll, Waker};
33
use std::{cmp, io};
34
35
/// An I/O object that follows a predefined script.
36
///
37
/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
38
/// follows the scenario described by the builder and panics otherwise.
39
#[derive(Debug)]
40
pub struct Mock {
41
    inner: Inner,
42
}
43
44
/// A handle to send additional actions to the related `Mock`.
45
#[derive(Debug)]
46
pub struct Handle {
47
    tx: mpsc::UnboundedSender<Action>,
48
}
49
50
/// Builds `Mock` instances.
51
#[derive(Debug, Clone, Default)]
52
pub struct Builder {
53
    // Sequence of actions for the Mock to take
54
    actions: VecDeque<Action>,
55
}
56
57
#[derive(Debug, Clone)]
58
enum Action {
59
    Read(Vec<u8>),
60
    Write(Vec<u8>),
61
    Wait(Duration),
62
    // Wrapped in Arc so that Builder can be cloned and Send.
63
    // Mock is not cloned as does not need to check Rc for ref counts.
64
    ReadError(Option<Arc<io::Error>>),
65
    WriteError(Option<Arc<io::Error>>),
66
}
67
68
struct Inner {
69
    actions: VecDeque<Action>,
70
    waiting: Option<Instant>,
71
    sleep: Option<Pin<Box<Sleep>>>,
72
    read_wait: Option<Waker>,
73
    rx: UnboundedReceiverStream<Action>,
74
}
75
76
impl Builder {
77
    /// Return a new, empty `Builder`.
78
0
    pub fn new() -> Self {
79
0
        Self::default()
80
0
    }
81
82
    /// Sequence a `read` operation.
83
    ///
84
    /// The next operation in the mock's script will be to expect a `read` call
85
    /// and return `buf`.
86
0
    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
87
0
        self.actions.push_back(Action::Read(buf.into()));
88
0
        self
89
0
    }
90
91
    /// Sequence a `read` operation that produces an error.
92
    ///
93
    /// The next operation in the mock's script will be to expect a `read` call
94
    /// and return `error`.
95
0
    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
96
0
        let error = Some(error.into());
97
0
        self.actions.push_back(Action::ReadError(error));
98
0
        self
99
0
    }
100
101
    /// Sequence a `write` operation.
102
    ///
103
    /// The next operation in the mock's script will be to expect a `write`
104
    /// call.
105
0
    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
106
0
        self.actions.push_back(Action::Write(buf.into()));
107
0
        self
108
0
    }
109
110
    /// Sequence a `write` operation that produces an error.
111
    ///
112
    /// The next operation in the mock's script will be to expect a `write`
113
    /// call that provides `error`.
114
0
    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
115
0
        let error = Some(error.into());
116
0
        self.actions.push_back(Action::WriteError(error));
117
0
        self
118
0
    }
119
120
    /// Sequence a wait.
121
    ///
122
    /// The next operation in the mock's script will be to wait without doing so
123
    /// for `duration` amount of time.
124
0
    pub fn wait(&mut self, duration: Duration) -> &mut Self {
125
0
        let duration = cmp::max(duration, Duration::from_millis(1));
126
0
        self.actions.push_back(Action::Wait(duration));
127
0
        self
128
0
    }
129
130
    /// Build a `Mock` value according to the defined script.
131
0
    pub fn build(&mut self) -> Mock {
132
0
        let (mock, _) = self.build_with_handle();
133
0
        mock
134
0
    }
135
136
    /// Build a `Mock` value paired with a handle
137
0
    pub fn build_with_handle(&mut self) -> (Mock, Handle) {
138
0
        let (inner, handle) = Inner::new(self.actions.clone());
139
0
140
0
        let mock = Mock { inner };
141
0
142
0
        (mock, handle)
143
0
    }
144
}
145
146
impl Handle {
147
    /// Sequence a `read` operation.
148
    ///
149
    /// The next operation in the mock's script will be to expect a `read` call
150
    /// and return `buf`.
151
0
    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
152
0
        self.tx.send(Action::Read(buf.into())).unwrap();
153
0
        self
154
0
    }
155
156
    /// Sequence a `read` operation error.
157
    ///
158
    /// The next operation in the mock's script will be to expect a `read` call
159
    /// and return `error`.
160
0
    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
161
0
        let error = Some(error.into());
162
0
        self.tx.send(Action::ReadError(error)).unwrap();
163
0
        self
164
0
    }
165
166
    /// Sequence a `write` operation.
167
    ///
168
    /// The next operation in the mock's script will be to expect a `write`
169
    /// call.
170
0
    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
171
0
        self.tx.send(Action::Write(buf.into())).unwrap();
172
0
        self
173
0
    }
174
175
    /// Sequence a `write` operation error.
176
    ///
177
    /// The next operation in the mock's script will be to expect a `write`
178
    /// call error.
179
0
    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
180
0
        let error = Some(error.into());
181
0
        self.tx.send(Action::WriteError(error)).unwrap();
182
0
        self
183
0
    }
184
}
185
186
impl Inner {
187
0
    fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
188
0
        let (tx, rx) = mpsc::unbounded_channel();
189
0
190
0
        let rx = UnboundedReceiverStream::new(rx);
191
0
192
0
        let inner = Inner {
193
0
            actions,
194
0
            sleep: None,
195
0
            read_wait: None,
196
0
            rx,
197
0
            waiting: None,
198
0
        };
199
0
200
0
        let handle = Handle { tx };
201
0
202
0
        (inner, handle)
203
0
    }
204
205
0
    fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
206
0
        Pin::new(&mut self.rx).poll_next(cx)
207
0
    }
208
209
0
    fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
210
0
        match self.action() {
211
0
            Some(&mut Action::Read(ref mut data)) => {
212
0
                // Figure out how much to copy
213
0
                let n = cmp::min(dst.remaining(), data.len());
214
0
215
0
                // Copy the data into the `dst` slice
216
0
                dst.put_slice(&data[..n]);
217
0
218
0
                // Drain the data from the source
219
0
                data.drain(..n);
220
0
221
0
                Ok(())
222
            }
223
0
            Some(&mut Action::ReadError(ref mut err)) => {
224
0
                // As the
225
0
                let err = err.take().expect("Should have been removed from actions.");
226
0
                let err = Arc::try_unwrap(err).expect("There are no other references.");
227
0
                Err(err)
228
            }
229
            Some(_) => {
230
                // Either waiting or expecting a write
231
0
                Err(io::ErrorKind::WouldBlock.into())
232
            }
233
0
            None => Ok(()),
234
        }
235
0
    }
236
237
0
    fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
238
0
        let mut ret = 0;
239
0
240
0
        if self.actions.is_empty() {
241
0
            return Err(io::ErrorKind::BrokenPipe.into());
242
0
        }
243
0
244
0
        if let Some(&mut Action::Wait(..)) = self.action() {
245
0
            return Err(io::ErrorKind::WouldBlock.into());
246
0
        }
247
248
0
        if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
249
0
            let err = err.take().expect("Should have been removed from actions.");
250
0
            let err = Arc::try_unwrap(err).expect("There are no other references.");
251
0
            return Err(err);
252
0
        }
253
254
0
        for i in 0..self.actions.len() {
255
0
            match self.actions[i] {
256
0
                Action::Write(ref mut expect) => {
257
0
                    let n = cmp::min(src.len(), expect.len());
258
0
259
0
                    assert_eq!(&src[..n], &expect[..n]);
260
261
                    // Drop data that was matched
262
0
                    expect.drain(..n);
263
0
                    src = &src[n..];
264
0
265
0
                    ret += n;
266
0
267
0
                    if src.is_empty() {
268
0
                        return Ok(ret);
269
0
                    }
270
                }
271
                Action::Wait(..) | Action::WriteError(..) => {
272
0
                    break;
273
                }
274
0
                _ => {}
275
            }
276
277
            // TODO: remove write
278
        }
279
280
0
        Ok(ret)
281
0
    }
282
283
0
    fn remaining_wait(&mut self) -> Option<Duration> {
284
0
        match self.action() {
285
0
            Some(&mut Action::Wait(dur)) => Some(dur),
286
0
            _ => None,
287
        }
288
0
    }
289
290
0
    fn action(&mut self) -> Option<&mut Action> {
291
        loop {
292
0
            if self.actions.is_empty() {
293
0
                return None;
294
0
            }
295
0
296
0
            match self.actions[0] {
297
0
                Action::Read(ref mut data) => {
298
0
                    if !data.is_empty() {
299
0
                        break;
300
0
                    }
301
                }
302
0
                Action::Write(ref mut data) => {
303
0
                    if !data.is_empty() {
304
0
                        break;
305
0
                    }
306
                }
307
0
                Action::Wait(ref mut dur) => {
308
0
                    if let Some(until) = self.waiting {
309
0
                        let now = Instant::now();
310
0
311
0
                        if now < until {
312
0
                            break;
313
0
                        } else {
314
0
                            self.waiting = None;
315
0
                        }
316
                    } else {
317
0
                        self.waiting = Some(Instant::now() + *dur);
318
0
                        break;
319
                    }
320
                }
321
0
                Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
322
0
                    if error.is_some() {
323
0
                        break;
324
0
                    }
325
                }
326
            }
327
328
0
            let _action = self.actions.pop_front();
329
        }
330
331
0
        self.actions.front_mut()
332
0
    }
333
}
334
335
// ===== impl Inner =====
336
337
impl Mock {
338
0
    fn maybe_wakeup_reader(&mut self) {
339
0
        match self.inner.action() {
340
            Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
341
0
                if let Some(waker) = self.inner.read_wait.take() {
342
0
                    waker.wake();
343
0
                }
344
            }
345
0
            _ => {}
346
        }
347
0
    }
348
}
349
350
impl AsyncRead for Mock {
351
0
    fn poll_read(
352
0
        mut self: Pin<&mut Self>,
353
0
        cx: &mut task::Context<'_>,
354
0
        buf: &mut ReadBuf<'_>,
355
0
    ) -> Poll<io::Result<()>> {
356
        loop {
357
0
            if let Some(ref mut sleep) = self.inner.sleep {
358
0
                ready!(Pin::new(sleep).poll(cx));
359
0
            }
360
361
            // If a sleep is set, it has already fired
362
0
            self.inner.sleep = None;
363
0
364
0
            // Capture 'filled' to monitor if it changed
365
0
            let filled = buf.filled().len();
366
0
367
0
            match self.inner.read(buf) {
368
0
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
369
0
                    if let Some(rem) = self.inner.remaining_wait() {
370
0
                        let until = Instant::now() + rem;
371
0
                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
372
0
                    } else {
373
0
                        self.inner.read_wait = Some(cx.waker().clone());
374
0
                        return Poll::Pending;
375
                    }
376
                }
377
                Ok(()) => {
378
0
                    if buf.filled().len() == filled {
379
0
                        match ready!(self.inner.poll_action(cx)) {
380
0
                            Some(action) => {
381
0
                                self.inner.actions.push_back(action);
382
0
                                continue;
383
                            }
384
                            None => {
385
0
                                return Poll::Ready(Ok(()));
386
                            }
387
                        }
388
                    } else {
389
0
                        return Poll::Ready(Ok(()));
390
                    }
391
                }
392
0
                Err(e) => return Poll::Ready(Err(e)),
393
            }
394
        }
395
0
    }
396
}
397
398
impl AsyncWrite for Mock {
399
0
    fn poll_write(
400
0
        mut self: Pin<&mut Self>,
401
0
        cx: &mut task::Context<'_>,
402
0
        buf: &[u8],
403
0
    ) -> Poll<io::Result<usize>> {
404
        loop {
405
0
            if let Some(ref mut sleep) = self.inner.sleep {
406
0
                ready!(Pin::new(sleep).poll(cx));
407
0
            }
408
409
            // If a sleep is set, it has already fired
410
0
            self.inner.sleep = None;
411
0
412
0
            if self.inner.actions.is_empty() {
413
0
                match self.inner.poll_action(cx) {
414
0
                    Poll::Pending => {
415
0
                        // do not propagate pending
416
0
                    }
417
0
                    Poll::Ready(Some(action)) => {
418
0
                        self.inner.actions.push_back(action);
419
0
                    }
420
                    Poll::Ready(None) => {
421
0
                        panic!("unexpected write");
422
                    }
423
                }
424
0
            }
425
426
0
            match self.inner.write(buf) {
427
0
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
428
0
                    if let Some(rem) = self.inner.remaining_wait() {
429
0
                        let until = Instant::now() + rem;
430
0
                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
431
0
                    } else {
432
0
                        panic!("unexpected WouldBlock");
433
                    }
434
                }
435
                Ok(0) => {
436
                    // TODO: Is this correct?
437
0
                    if !self.inner.actions.is_empty() {
438
0
                        return Poll::Pending;
439
0
                    }
440
441
                    // TODO: Extract
442
0
                    match ready!(self.inner.poll_action(cx)) {
443
0
                        Some(action) => {
444
0
                            self.inner.actions.push_back(action);
445
0
                            continue;
446
                        }
447
                        None => {
448
0
                            panic!("unexpected write");
449
                        }
450
                    }
451
                }
452
0
                ret => {
453
0
                    self.maybe_wakeup_reader();
454
0
                    return Poll::Ready(ret);
455
                }
456
            }
457
        }
458
0
    }
459
460
0
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
461
0
        Poll::Ready(Ok(()))
462
0
    }
463
464
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
465
0
        Poll::Ready(Ok(()))
466
0
    }
467
}
468
469
/// Ensures that Mock isn't dropped with data "inside".
470
impl Drop for Mock {
471
0
    fn drop(&mut self) {
472
0
        // Avoid double panicking, since makes debugging much harder.
473
0
        if std::thread::panicking() {
474
0
            return;
475
0
        }
476
0
477
0
        self.inner.actions.iter().for_each(|a| match a {
478
0
            Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
479
0
            Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
480
0
            _ => (),
481
0
        });
482
0
    }
483
}
484
/*
485
/// Returns `true` if called from the context of a futures-rs Task
486
fn is_task_ctx() -> bool {
487
    use std::panic;
488
489
    // Save the existing panic hook
490
    let h = panic::take_hook();
491
492
    // Install a new one that does nothing
493
    panic::set_hook(Box::new(|_| {}));
494
495
    // Attempt to call the fn
496
    let r = panic::catch_unwind(|| task::current()).is_ok();
497
498
    // Re-install the old one
499
    panic::set_hook(h);
500
501
    // Return the result
502
    r
503
}
504
*/
505
506
impl fmt::Debug for Inner {
507
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508
0
        write!(f, "Inner {{...}}")
509
0
    }
510
}