Coverage Report

Created: 2026-02-14 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-test-0.4.5/src/io.rs
Line
Count
Source
1
#![cfg(not(loom))]
2
3
//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
4
//!
5
//!
6
//! # Overview
7
//!
8
//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
9
//! to handle an arbitrary sequence of read and write operations. This is useful
10
//! for writing unit tests for networking services as using an actual network
11
//! type is fairly non deterministic.
12
//!
13
//! # Usage
14
//!
15
//! Attempting to write data that the mock isn't expecting will result in a
16
//! panic.
17
//!
18
//! [`AsyncRead`]: tokio::io::AsyncRead
19
//! [`AsyncWrite`]: tokio::io::AsyncWrite
20
21
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22
use tokio::sync::mpsc;
23
use tokio::time::{self, Duration, Instant, Sleep};
24
use tokio_stream::wrappers::UnboundedReceiverStream;
25
26
use futures_core::Stream;
27
use std::collections::VecDeque;
28
use std::fmt;
29
use std::future::Future;
30
use std::pin::Pin;
31
use std::sync::Arc;
32
use std::task::{self, ready, Poll, Waker};
33
use std::{cmp, io};
34
35
/// An I/O object that follows a predefined script.
36
///
37
/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
38
/// follows the scenario described by the builder and panics otherwise.
39
#[derive(Debug)]
40
pub struct Mock {
41
    inner: Inner,
42
}
43
44
/// A handle to send additional actions to the related `Mock`.
45
#[derive(Debug)]
46
pub struct Handle {
47
    tx: mpsc::UnboundedSender<Action>,
48
}
49
50
/// Builds `Mock` instances.
51
#[derive(Debug, Clone, Default)]
52
pub struct Builder {
53
    // Sequence of actions for the Mock to take
54
    actions: VecDeque<Action>,
55
    name: String,
56
}
57
58
#[derive(Debug, Clone)]
59
enum Action {
60
    Read(Vec<u8>),
61
    Write(Vec<u8>),
62
    Wait(Duration),
63
    // Wrapped in Arc so that Builder can be cloned and Send.
64
    // Mock is not cloned as does not need to check Rc for ref counts.
65
    ReadError(Option<Arc<io::Error>>),
66
    WriteError(Option<Arc<io::Error>>),
67
}
68
69
struct Inner {
70
    actions: VecDeque<Action>,
71
    waiting: Option<Instant>,
72
    sleep: Option<Pin<Box<Sleep>>>,
73
    read_wait: Option<Waker>,
74
    rx: UnboundedReceiverStream<Action>,
75
    name: String,
76
}
77
78
impl Builder {
79
    /// Return a new, empty `Builder`.
80
0
    pub fn new() -> Self {
81
0
        Self::default()
82
0
    }
83
84
    /// Sequence a `read` operation.
85
    ///
86
    /// The next operation in the mock's script will be to expect a `read` call
87
    /// and return `buf`.
88
0
    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
89
0
        self.actions.push_back(Action::Read(buf.into()));
90
0
        self
91
0
    }
92
93
    /// Sequence a `read` operation that produces an error.
94
    ///
95
    /// The next operation in the mock's script will be to expect a `read` call
96
    /// and return `error`.
97
0
    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
98
0
        let error = Some(error.into());
99
0
        self.actions.push_back(Action::ReadError(error));
100
0
        self
101
0
    }
102
103
    /// Sequence a `write` operation.
104
    ///
105
    /// The next operation in the mock's script will be to expect a `write`
106
    /// call.
107
0
    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
108
0
        self.actions.push_back(Action::Write(buf.into()));
109
0
        self
110
0
    }
111
112
    /// Sequence a `write` operation that produces an error.
113
    ///
114
    /// The next operation in the mock's script will be to expect a `write`
115
    /// call that provides `error`.
116
0
    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
117
0
        let error = Some(error.into());
118
0
        self.actions.push_back(Action::WriteError(error));
119
0
        self
120
0
    }
121
122
    /// Sequence a wait.
123
    ///
124
    /// The next operation in the mock's script will be to wait without doing so
125
    /// for `duration` amount of time.
126
0
    pub fn wait(&mut self, duration: Duration) -> &mut Self {
127
0
        let duration = cmp::max(duration, Duration::from_millis(1));
128
0
        self.actions.push_back(Action::Wait(duration));
129
0
        self
130
0
    }
131
132
    /// Set name of the mock IO object to include in panic messages and debug output
133
0
    pub fn name(&mut self, name: impl Into<String>) -> &mut Self {
134
0
        self.name = name.into();
135
0
        self
136
0
    }
137
138
    /// Build a `Mock` value according to the defined script.
139
0
    pub fn build(&mut self) -> Mock {
140
0
        let (mock, _) = self.build_with_handle();
141
0
        mock
142
0
    }
143
144
    /// Build a `Mock` value paired with a handle
145
0
    pub fn build_with_handle(&mut self) -> (Mock, Handle) {
146
0
        let (inner, handle) = Inner::new(self.actions.clone(), self.name.clone());
147
148
0
        let mock = Mock { inner };
149
150
0
        (mock, handle)
151
0
    }
152
}
153
154
impl Handle {
155
    /// Sequence a `read` operation.
156
    ///
157
    /// The next operation in the mock's script will be to expect a `read` call
158
    /// and return `buf`.
159
0
    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
160
0
        self.tx.send(Action::Read(buf.into())).unwrap();
161
0
        self
162
0
    }
163
164
    /// Sequence a `read` operation error.
165
    ///
166
    /// The next operation in the mock's script will be to expect a `read` call
167
    /// and return `error`.
168
0
    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
169
0
        let error = Some(error.into());
170
0
        self.tx.send(Action::ReadError(error)).unwrap();
171
0
        self
172
0
    }
173
174
    /// Sequence a `write` operation.
175
    ///
176
    /// The next operation in the mock's script will be to expect a `write`
177
    /// call.
178
0
    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
179
0
        self.tx.send(Action::Write(buf.into())).unwrap();
180
0
        self
181
0
    }
182
183
    /// Sequence a `write` operation error.
184
    ///
185
    /// The next operation in the mock's script will be to expect a `write`
186
    /// call error.
187
0
    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
188
0
        let error = Some(error.into());
189
0
        self.tx.send(Action::WriteError(error)).unwrap();
190
0
        self
191
0
    }
192
}
193
194
impl Inner {
195
0
    fn new(actions: VecDeque<Action>, name: String) -> (Inner, Handle) {
196
0
        let (tx, rx) = mpsc::unbounded_channel();
197
198
0
        let rx = UnboundedReceiverStream::new(rx);
199
200
0
        let inner = Inner {
201
0
            actions,
202
0
            sleep: None,
203
0
            read_wait: None,
204
0
            rx,
205
0
            waiting: None,
206
0
            name,
207
0
        };
208
209
0
        let handle = Handle { tx };
210
211
0
        (inner, handle)
212
0
    }
213
214
0
    fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
215
0
        Pin::new(&mut self.rx).poll_next(cx)
216
0
    }
217
218
0
    fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
219
0
        match self.action() {
220
0
            Some(&mut Action::Read(ref mut data)) => {
221
                // Figure out how much to copy
222
0
                let n = cmp::min(dst.remaining(), data.len());
223
224
                // Copy the data into the `dst` slice
225
0
                dst.put_slice(&data[..n]);
226
227
                // Drain the data from the source
228
0
                data.drain(..n);
229
230
0
                Ok(())
231
            }
232
0
            Some(&mut Action::ReadError(ref mut err)) => {
233
                // As the
234
0
                let err = err.take().expect("Should have been removed from actions.");
235
0
                let err = Arc::try_unwrap(err).expect("There are no other references.");
236
0
                Err(err)
237
            }
238
            Some(_) => {
239
                // Either waiting or expecting a write
240
0
                Err(io::ErrorKind::WouldBlock.into())
241
            }
242
0
            None => Ok(()),
243
        }
244
0
    }
245
246
0
    fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
247
0
        let mut ret = 0;
248
249
0
        if self.actions.is_empty() {
250
0
            return Err(io::ErrorKind::BrokenPipe.into());
251
0
        }
252
253
0
        if let Some(&mut Action::Wait(..)) = self.action() {
254
0
            return Err(io::ErrorKind::WouldBlock.into());
255
0
        }
256
257
0
        if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
258
0
            let err = err.take().expect("Should have been removed from actions.");
259
0
            let err = Arc::try_unwrap(err).expect("There are no other references.");
260
0
            return Err(err);
261
0
        }
262
263
0
        for i in 0..self.actions.len() {
264
0
            match self.actions[i] {
265
0
                Action::Write(ref mut expect) => {
266
0
                    let n = cmp::min(src.len(), expect.len());
267
268
0
                    assert_eq!(&src[..n], &expect[..n], "name={} i={}", self.name, i);
269
270
                    // Drop data that was matched
271
0
                    expect.drain(..n);
272
0
                    src = &src[n..];
273
274
0
                    ret += n;
275
276
0
                    if src.is_empty() {
277
0
                        return Ok(ret);
278
0
                    }
279
                }
280
                Action::Wait(..) | Action::WriteError(..) => {
281
0
                    break;
282
                }
283
0
                _ => {}
284
            }
285
286
            // TODO: remove write
287
        }
288
289
0
        Ok(ret)
290
0
    }
291
292
0
    fn remaining_wait(&mut self) -> Option<Duration> {
293
0
        match self.action() {
294
0
            Some(&mut Action::Wait(dur)) => Some(dur),
295
0
            _ => None,
296
        }
297
0
    }
298
299
0
    fn action(&mut self) -> Option<&mut Action> {
300
        loop {
301
0
            if self.actions.is_empty() {
302
0
                return None;
303
0
            }
304
305
0
            match self.actions[0] {
306
0
                Action::Read(ref mut data) => {
307
0
                    if !data.is_empty() {
308
0
                        break;
309
0
                    }
310
                }
311
0
                Action::Write(ref mut data) => {
312
0
                    if !data.is_empty() {
313
0
                        break;
314
0
                    }
315
                }
316
0
                Action::Wait(ref mut dur) => {
317
0
                    if let Some(until) = self.waiting {
318
0
                        let now = Instant::now();
319
320
0
                        if now < until {
321
0
                            break;
322
0
                        } else {
323
0
                            self.waiting = None;
324
0
                        }
325
                    } else {
326
0
                        self.waiting = Some(Instant::now() + *dur);
327
0
                        break;
328
                    }
329
                }
330
0
                Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
331
0
                    if error.is_some() {
332
0
                        break;
333
0
                    }
334
                }
335
            }
336
337
0
            let _action = self.actions.pop_front();
338
        }
339
340
0
        self.actions.front_mut()
341
0
    }
342
}
343
344
// ===== impl Inner =====
345
346
impl Mock {
347
0
    fn maybe_wakeup_reader(&mut self) {
348
0
        match self.inner.action() {
349
            Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
350
0
                if let Some(waker) = self.inner.read_wait.take() {
351
0
                    waker.wake();
352
0
                }
353
            }
354
0
            _ => {}
355
        }
356
0
    }
357
}
358
359
impl AsyncRead for Mock {
360
0
    fn poll_read(
361
0
        mut self: Pin<&mut Self>,
362
0
        cx: &mut task::Context<'_>,
363
0
        buf: &mut ReadBuf<'_>,
364
0
    ) -> Poll<io::Result<()>> {
365
        loop {
366
0
            if let Some(ref mut sleep) = self.inner.sleep {
367
0
                ready!(Pin::new(sleep).poll(cx));
368
0
            }
369
370
            // If a sleep is set, it has already fired
371
0
            self.inner.sleep = None;
372
373
            // Capture 'filled' to monitor if it changed
374
0
            let filled = buf.filled().len();
375
376
0
            match self.inner.read(buf) {
377
0
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
378
0
                    if let Some(rem) = self.inner.remaining_wait() {
379
0
                        let until = Instant::now() + rem;
380
0
                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
381
0
                    } else {
382
0
                        self.inner.read_wait = Some(cx.waker().clone());
383
0
                        return Poll::Pending;
384
                    }
385
                }
386
                Ok(()) => {
387
0
                    if buf.filled().len() == filled {
388
0
                        match ready!(self.inner.poll_action(cx)) {
389
0
                            Some(action) => {
390
0
                                self.inner.actions.push_back(action);
391
0
                                continue;
392
                            }
393
                            None => {
394
0
                                return Poll::Ready(Ok(()));
395
                            }
396
                        }
397
                    } else {
398
0
                        return Poll::Ready(Ok(()));
399
                    }
400
                }
401
0
                Err(e) => return Poll::Ready(Err(e)),
402
            }
403
        }
404
0
    }
405
}
406
407
impl AsyncWrite for Mock {
408
0
    fn poll_write(
409
0
        mut self: Pin<&mut Self>,
410
0
        cx: &mut task::Context<'_>,
411
0
        buf: &[u8],
412
0
    ) -> Poll<io::Result<usize>> {
413
        loop {
414
0
            if let Some(ref mut sleep) = self.inner.sleep {
415
0
                ready!(Pin::new(sleep).poll(cx));
416
0
            }
417
418
            // If a sleep is set, it has already fired
419
0
            self.inner.sleep = None;
420
421
0
            if self.inner.actions.is_empty() {
422
0
                match self.inner.poll_action(cx) {
423
0
                    Poll::Pending => {
424
0
                        // do not propagate pending
425
0
                    }
426
0
                    Poll::Ready(Some(action)) => {
427
0
                        self.inner.actions.push_back(action);
428
0
                    }
429
                    Poll::Ready(None) => {
430
0
                        panic!("unexpected write {}", self.pmsg());
431
                    }
432
                }
433
0
            }
434
435
0
            match self.inner.write(buf) {
436
0
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
437
0
                    if let Some(rem) = self.inner.remaining_wait() {
438
0
                        let until = Instant::now() + rem;
439
0
                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
440
0
                    } else {
441
0
                        panic!("unexpected WouldBlock {}", self.pmsg());
442
                    }
443
                }
444
                Ok(0) => {
445
                    // TODO: Is this correct?
446
0
                    if !self.inner.actions.is_empty() {
447
0
                        return Poll::Pending;
448
0
                    }
449
450
                    // TODO: Extract
451
0
                    match ready!(self.inner.poll_action(cx)) {
452
0
                        Some(action) => {
453
0
                            self.inner.actions.push_back(action);
454
0
                            continue;
455
                        }
456
                        None => {
457
0
                            panic!("unexpected write {}", self.pmsg());
458
                        }
459
                    }
460
                }
461
0
                ret => {
462
0
                    self.maybe_wakeup_reader();
463
0
                    return Poll::Ready(ret);
464
                }
465
            }
466
        }
467
0
    }
468
469
0
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
470
0
        Poll::Ready(Ok(()))
471
0
    }
472
473
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
474
0
        Poll::Ready(Ok(()))
475
0
    }
476
}
477
478
/// Ensures that Mock isn't dropped with data "inside".
479
impl Drop for Mock {
480
0
    fn drop(&mut self) {
481
        // Avoid double panicking, since makes debugging much harder.
482
0
        if std::thread::panicking() {
483
0
            return;
484
0
        }
485
486
0
        self.inner.actions.iter().for_each(|a| match a {
487
0
            Action::Read(data) => assert!(
488
0
                data.is_empty(),
489
0
                "There is still data left to read. {}",
490
0
                self.pmsg()
491
            ),
492
0
            Action::Write(data) => assert!(
493
0
                data.is_empty(),
494
0
                "There is still data left to write. {}",
495
0
                self.pmsg()
496
            ),
497
0
            _ => (),
498
0
        });
499
0
    }
500
}
501
/*
502
/// Returns `true` if called from the context of a futures-rs Task
503
fn is_task_ctx() -> bool {
504
    use std::panic;
505
506
    // Save the existing panic hook
507
    let h = panic::take_hook();
508
509
    // Install a new one that does nothing
510
    panic::set_hook(Box::new(|_| {}));
511
512
    // Attempt to call the fn
513
    let r = panic::catch_unwind(|| task::current()).is_ok();
514
515
    // Re-install the old one
516
    panic::set_hook(h);
517
518
    // Return the result
519
    r
520
}
521
*/
522
523
impl fmt::Debug for Inner {
524
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525
0
        if self.name.is_empty() {
526
0
            write!(f, "Inner {{...}}")
527
        } else {
528
0
            write!(f, "Inner {{name={}, ...}}", self.name)
529
        }
530
0
    }
531
}
532
533
struct PanicMsgSnippet<'a>(&'a Inner);
534
535
impl<'a> fmt::Display for PanicMsgSnippet<'a> {
536
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537
0
        if self.0.name.is_empty() {
538
0
            write!(f, "({} actions remain)", self.0.actions.len())
539
        } else {
540
0
            write!(
541
0
                f,
542
0
                "(name {}, {} actions remain)",
543
                self.0.name,
544
0
                self.0.actions.len()
545
            )
546
        }
547
0
    }
548
}
549
550
impl Mock {
551
0
    fn pmsg(&self) -> PanicMsgSnippet<'_> {
552
0
        PanicMsgSnippet(&self.inner)
553
0
    }
554
}