Coverage Report

Created: 2026-04-12 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-test-0.4.5/src/task.rs
Line
Count
Source
1
//! Futures task based helpers to easily test futures and manually written futures.
2
//!
3
//! The [`Spawn`] type is used as a mock task harness that allows you to poll futures
4
//! without needing to setup pinning or context. Any future can be polled but if the
5
//! future requires the tokio async context you will need to ensure that you poll the
6
//! [`Spawn`] within a tokio context, this means that as long as you are inside the
7
//! runtime it will work and you can poll it via [`Spawn`].
8
//!
9
//! [`Spawn`] also supports [`Stream`] to call `poll_next` without pinning
10
//! or context.
11
//!
12
//! In addition to circumventing the need for pinning and context, [`Spawn`] also tracks
13
//! the amount of times the future/task was woken. This can be useful to track if some
14
//! leaf future notified the root task correctly.
15
//!
16
//! # Example
17
//!
18
//! ```
19
//! use tokio_test::task;
20
//!
21
//! let fut = async {};
22
//!
23
//! let mut task = task::spawn(fut);
24
//!
25
//! assert!(task.poll().is_ready(), "Task was not ready!");
26
//! ```
27
28
use std::future::Future;
29
use std::mem;
30
use std::ops;
31
use std::pin::Pin;
32
use std::sync::{Arc, Condvar, Mutex};
33
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
34
35
use tokio_stream::Stream;
36
37
/// Spawn a future into a [`Spawn`] which wraps the future in a mocked executor.
38
///
39
/// This can be used to spawn a [`Future`] or a [`Stream`].
40
///
41
/// For more information, check the module docs.
42
0
pub fn spawn<T>(task: T) -> Spawn<T> {
43
0
    Spawn {
44
0
        task: MockTask::new(),
45
0
        future: Box::pin(task),
46
0
    }
47
0
}
48
49
/// Future spawned on a mock task that can be used to poll the future or stream
50
/// without needing pinning or context types.
51
#[derive(Debug)]
52
#[must_use = "futures do nothing unless you `.await` or poll them"]
53
pub struct Spawn<T> {
54
    task: MockTask,
55
    future: Pin<Box<T>>,
56
}
57
58
#[derive(Debug, Clone)]
59
struct MockTask {
60
    waker: Arc<ThreadWaker>,
61
}
62
63
#[derive(Debug)]
64
struct ThreadWaker {
65
    state: Mutex<usize>,
66
    condvar: Condvar,
67
}
68
69
const IDLE: usize = 0;
70
const WAKE: usize = 1;
71
const SLEEP: usize = 2;
72
73
impl<T> Spawn<T> {
74
    /// Consumes `self` returning the inner value
75
0
    pub fn into_inner(self) -> T
76
0
    where
77
0
        T: Unpin,
78
    {
79
0
        *Pin::into_inner(self.future)
80
0
    }
81
82
    /// Returns `true` if the inner future has received a wake notification
83
    /// since the last call to `enter`.
84
0
    pub fn is_woken(&self) -> bool {
85
0
        self.task.is_woken()
86
0
    }
87
88
    /// Returns the number of references to the task waker
89
    ///
90
    /// The task itself holds a reference. The return value will never be zero.
91
0
    pub fn waker_ref_count(&self) -> usize {
92
0
        self.task.waker_ref_count()
93
0
    }
94
95
    /// Enter the task context
96
0
    pub fn enter<F, R>(&mut self, f: F) -> R
97
0
    where
98
0
        F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R,
99
    {
100
0
        let fut = self.future.as_mut();
101
0
        self.task.enter(|cx| f(cx, fut))
102
0
    }
103
}
104
105
impl<T: Unpin> ops::Deref for Spawn<T> {
106
    type Target = T;
107
108
0
    fn deref(&self) -> &T {
109
0
        &self.future
110
0
    }
111
}
112
113
impl<T: Unpin> ops::DerefMut for Spawn<T> {
114
0
    fn deref_mut(&mut self) -> &mut T {
115
0
        &mut self.future
116
0
    }
117
}
118
119
impl<T: Future> Spawn<T> {
120
    /// If `T` is a [`Future`] then poll it. This will handle pinning and the context
121
    /// type for the future.
122
0
    pub fn poll(&mut self) -> Poll<T::Output> {
123
0
        let fut = self.future.as_mut();
124
0
        self.task.enter(|cx| fut.poll(cx))
125
0
    }
126
}
127
128
impl<T: Stream> Spawn<T> {
129
    /// If `T` is a [`Stream`] then `poll_next` it. This will handle pinning and the context
130
    /// type for the stream.
131
0
    pub fn poll_next(&mut self) -> Poll<Option<T::Item>> {
132
0
        let stream = self.future.as_mut();
133
0
        self.task.enter(|cx| stream.poll_next(cx))
134
0
    }
135
}
136
137
impl<T: Future> Future for Spawn<T> {
138
    type Output = T::Output;
139
140
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141
0
        self.future.as_mut().poll(cx)
142
0
    }
143
}
144
145
impl<T: Stream> Stream for Spawn<T> {
146
    type Item = T::Item;
147
148
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149
0
        self.future.as_mut().poll_next(cx)
150
0
    }
151
152
0
    fn size_hint(&self) -> (usize, Option<usize>) {
153
0
        self.future.size_hint()
154
0
    }
155
}
156
157
impl MockTask {
158
    /// Creates new mock task
159
0
    fn new() -> Self {
160
0
        MockTask {
161
0
            waker: Arc::new(ThreadWaker::new()),
162
0
        }
163
0
    }
164
165
    /// Runs a closure from the context of the task.
166
    ///
167
    /// Any wake notifications resulting from the execution of the closure are
168
    /// tracked.
169
0
    fn enter<F, R>(&mut self, f: F) -> R
170
0
    where
171
0
        F: FnOnce(&mut Context<'_>) -> R,
172
    {
173
0
        self.waker.clear();
174
0
        let waker = self.waker();
175
0
        let mut cx = Context::from_waker(&waker);
176
177
0
        f(&mut cx)
178
0
    }
179
180
    /// Returns `true` if the inner future has received a wake notification
181
    /// since the last call to `enter`.
182
0
    fn is_woken(&self) -> bool {
183
0
        self.waker.is_woken()
184
0
    }
185
186
    /// Returns the number of references to the task waker
187
    ///
188
    /// The task itself holds a reference. The return value will never be zero.
189
0
    fn waker_ref_count(&self) -> usize {
190
0
        Arc::strong_count(&self.waker)
191
0
    }
192
193
0
    fn waker(&self) -> Waker {
194
        unsafe {
195
0
            let raw = to_raw(self.waker.clone());
196
0
            Waker::from_raw(raw)
197
        }
198
0
    }
199
}
200
201
impl Default for MockTask {
202
0
    fn default() -> Self {
203
0
        Self::new()
204
0
    }
205
}
206
207
impl ThreadWaker {
208
0
    fn new() -> Self {
209
0
        ThreadWaker {
210
0
            state: Mutex::new(IDLE),
211
0
            condvar: Condvar::new(),
212
0
        }
213
0
    }
214
215
    /// Clears any previously received wakes, avoiding potential spurious
216
    /// wake notifications. This should only be called immediately before running the
217
    /// task.
218
0
    fn clear(&self) {
219
0
        *self.state.lock().unwrap() = IDLE;
220
0
    }
221
222
0
    fn is_woken(&self) -> bool {
223
0
        match *self.state.lock().unwrap() {
224
0
            IDLE => false,
225
0
            WAKE => true,
226
0
            _ => unreachable!(),
227
        }
228
0
    }
229
230
0
    fn wake(&self) {
231
        // First, try transitioning from IDLE -> NOTIFY, this does not require a lock.
232
0
        let mut state = self.state.lock().unwrap();
233
0
        let prev = *state;
234
235
0
        if prev == WAKE {
236
0
            return;
237
0
        }
238
239
0
        *state = WAKE;
240
241
0
        if prev == IDLE {
242
0
            return;
243
0
        }
244
245
        // The other half is sleeping, so we wake it up.
246
0
        assert_eq!(prev, SLEEP);
247
0
        self.condvar.notify_one();
248
0
    }
249
}
250
251
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
252
253
0
unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
254
0
    RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
255
0
}
256
257
0
unsafe fn from_raw(raw: *const ()) -> Arc<ThreadWaker> {
258
0
    Arc::from_raw(raw as *const ThreadWaker)
259
0
}
260
261
0
unsafe fn clone(raw: *const ()) -> RawWaker {
262
0
    let waker = from_raw(raw);
263
264
    // Increment the ref count
265
0
    mem::forget(waker.clone());
266
267
0
    to_raw(waker)
268
0
}
269
270
0
unsafe fn wake(raw: *const ()) {
271
0
    let waker = from_raw(raw);
272
0
    waker.wake();
273
0
}
274
275
0
unsafe fn wake_by_ref(raw: *const ()) {
276
0
    let waker = from_raw(raw);
277
0
    waker.wake();
278
279
    // We don't actually own a reference to the unparker
280
0
    mem::forget(waker);
281
0
}
282
283
0
unsafe fn drop_waker(raw: *const ()) {
284
0
    let _ = from_raw(raw);
285
0
}