Coverage Report

Created: 2025-08-26 07:09

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-test-0.4.4/src/stream_mock.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg(not(loom))]
2
3
//! A mock stream implementing [`Stream`].
4
//!
5
//! # Overview
6
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
7
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
8
//! intervals between items.
9
//!
10
//! # Usage
11
//! To use the `StreamMock`, you need to create a builder using [`StreamMockBuilder`]. The builder
12
//! allows you to enqueue actions such as returning items or waiting for a certain duration.
13
//!
14
//! # Example
15
//! ```rust
16
//!
17
//! use futures_util::StreamExt;
18
//! use std::time::Duration;
19
//! use tokio_test::stream_mock::StreamMockBuilder;
20
//!
21
//! async fn test_stream_mock_wait() {
22
//!     let mut stream_mock = StreamMockBuilder::new()
23
//!         .next(1)
24
//!         .wait(Duration::from_millis(300))
25
//!         .next(2)
26
//!         .build();
27
//!
28
//!     assert_eq!(stream_mock.next().await, Some(1));
29
//!     let start = std::time::Instant::now();
30
//!     assert_eq!(stream_mock.next().await, Some(2));
31
//!     let elapsed = start.elapsed();
32
//!     assert!(elapsed >= Duration::from_millis(300));
33
//!     assert_eq!(stream_mock.next().await, None);
34
//! }
35
//! ```
36
37
use std::collections::VecDeque;
38
use std::pin::Pin;
39
use std::task::Poll;
40
use std::time::Duration;
41
42
use futures_core::{ready, Stream};
43
use std::future::Future;
44
use tokio::time::{sleep_until, Instant, Sleep};
45
46
#[derive(Debug, Clone)]
47
enum Action<T: Unpin> {
48
    Next(T),
49
    Wait(Duration),
50
}
51
52
/// A builder for [`StreamMock`]
53
#[derive(Debug, Clone)]
54
pub struct StreamMockBuilder<T: Unpin> {
55
    actions: VecDeque<Action<T>>,
56
}
57
58
impl<T: Unpin> StreamMockBuilder<T> {
59
    /// Create a new empty [`StreamMockBuilder`]
60
0
    pub fn new() -> Self {
61
0
        StreamMockBuilder::default()
62
0
    }
63
64
    /// Queue an item to be returned by the stream
65
0
    pub fn next(mut self, value: T) -> Self {
66
0
        self.actions.push_back(Action::Next(value));
67
0
        self
68
0
    }
69
70
    // Queue an item to be consumed by the sink,
71
    // commented out until Sink is implemented.
72
    //
73
    // pub fn consume(mut self, value: T) -> Self {
74
    //    self.actions.push_back(Action::Consume(value));
75
    //    self
76
    // }
77
78
    /// Queue the stream to wait for a duration
79
0
    pub fn wait(mut self, duration: Duration) -> Self {
80
0
        self.actions.push_back(Action::Wait(duration));
81
0
        self
82
0
    }
83
84
    /// Build the [`StreamMock`]
85
0
    pub fn build(self) -> StreamMock<T> {
86
0
        StreamMock {
87
0
            actions: self.actions,
88
0
            sleep: None,
89
0
        }
90
0
    }
91
}
92
93
impl<T: Unpin> Default for StreamMockBuilder<T> {
94
0
    fn default() -> Self {
95
0
        StreamMockBuilder {
96
0
            actions: VecDeque::new(),
97
0
        }
98
0
    }
99
}
100
101
/// A mock stream implementing [`Stream`]
102
///
103
/// See [`StreamMockBuilder`] for more information.
104
#[derive(Debug)]
105
pub struct StreamMock<T: Unpin> {
106
    actions: VecDeque<Action<T>>,
107
    sleep: Option<Pin<Box<Sleep>>>,
108
}
109
110
impl<T: Unpin> StreamMock<T> {
111
0
    fn next_action(&mut self) -> Option<Action<T>> {
112
0
        self.actions.pop_front()
113
0
    }
114
}
115
116
impl<T: Unpin> Stream for StreamMock<T> {
117
    type Item = T;
118
119
0
    fn poll_next(
120
0
        mut self: std::pin::Pin<&mut Self>,
121
0
        cx: &mut std::task::Context<'_>,
122
0
    ) -> std::task::Poll<Option<Self::Item>> {
123
        // Try polling the sleep future first
124
0
        if let Some(ref mut sleep) = self.sleep {
125
0
            ready!(Pin::new(sleep).poll(cx));
126
            // Since we're ready, discard the sleep future
127
0
            self.sleep.take();
128
0
        }
129
130
0
        match self.next_action() {
131
0
            Some(action) => match action {
132
0
                Action::Next(item) => Poll::Ready(Some(item)),
133
0
                Action::Wait(duration) => {
134
0
                    // Set up a sleep future and schedule this future to be polled again for it.
135
0
                    self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
136
0
                    cx.waker().wake_by_ref();
137
0
138
0
                    Poll::Pending
139
                }
140
            },
141
0
            None => Poll::Ready(None),
142
        }
143
0
    }
144
}
145
146
impl<T: Unpin> Drop for StreamMock<T> {
147
0
    fn drop(&mut self) {
148
0
        // Avoid double panicking to make debugging easier.
149
0
        if std::thread::panicking() {
150
0
            return;
151
0
        }
152
0
153
0
        let undropped_count = self
154
0
            .actions
155
0
            .iter()
156
0
            .filter(|action| match action {
157
0
                Action::Next(_) => true,
158
0
                Action::Wait(_) => false,
159
0
            })
160
0
            .count();
161
0
162
0
        assert!(
163
0
            undropped_count == 0,
164
0
            "StreamMock was dropped before all actions were consumed, {} actions were not consumed",
165
            undropped_count
166
        );
167
0
    }
168
}