Coverage Report

Created: 2026-05-16 06:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-task-4.3.0/src/runnable.rs
Line
Count
Source
1
use core::fmt;
2
use core::future::Future;
3
use core::marker::PhantomData;
4
use core::mem;
5
use core::ptr::NonNull;
6
use core::sync::atomic::Ordering;
7
use core::task::Waker;
8
9
use crate::header::Header;
10
use crate::raw::RawTask;
11
use crate::state::*;
12
use crate::Task;
13
14
/// Creates a new task.
15
///
16
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
17
/// output.
18
///
19
/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
20
/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
21
/// again.
22
///
23
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
24
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
25
/// should push it into a task queue so that it can be processed later.
26
///
27
/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
28
/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
29
///
30
/// # Examples
31
///
32
/// ```
33
/// // The future inside the task.
34
/// let future = async {
35
///     println!("Hello, world!");
36
/// };
37
///
38
/// // A function that schedules the task when it gets woken up.
39
/// let (s, r) = flume::unbounded();
40
/// let schedule = move |runnable| s.send(runnable).unwrap();
41
///
42
/// // Create a task with the future and the schedule function.
43
/// let (runnable, task) = async_task::spawn(future, schedule);
44
/// ```
45
0
pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
46
0
where
47
0
    F: Future + Send + 'static,
48
0
    F::Output: Send + 'static,
49
0
    S: Fn(Runnable) + Send + Sync + 'static,
50
{
51
0
    unsafe { spawn_unchecked(future, schedule) }
52
0
}
53
54
/// Creates a new thread-local task.
55
///
56
/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
57
/// [`Runnable`] is used or dropped on another thread, a panic will occur.
58
///
59
/// This function is only available when the `std` feature for this crate is enabled.
60
///
61
/// # Examples
62
///
63
/// ```
64
/// use async_task::Runnable;
65
/// use flume::{Receiver, Sender};
66
/// use std::rc::Rc;
67
///
68
/// thread_local! {
69
///     // A queue that holds scheduled tasks.
70
///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
71
/// }
72
///
73
/// // Make a non-Send future.
74
/// let msg: Rc<str> = "Hello, world!".into();
75
/// let future = async move {
76
///     println!("{}", msg);
77
/// };
78
///
79
/// // A function that schedules the task when it gets woken up.
80
/// let s = QUEUE.with(|(s, _)| s.clone());
81
/// let schedule = move |runnable| s.send(runnable).unwrap();
82
///
83
/// // Create a task with the future and the schedule function.
84
/// let (runnable, task) = async_task::spawn_local(future, schedule);
85
/// ```
86
#[cfg(feature = "std")]
87
0
pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
88
0
where
89
0
    F: Future + 'static,
90
0
    F::Output: 'static,
91
0
    S: Fn(Runnable) + Send + Sync + 'static,
92
{
93
    use std::mem::ManuallyDrop;
94
    use std::pin::Pin;
95
    use std::task::{Context, Poll};
96
    use std::thread::{self, ThreadId};
97
98
    #[inline]
99
0
    fn thread_id() -> ThreadId {
100
        thread_local! {
101
            static ID: ThreadId = thread::current().id();
102
        }
103
0
        ID.try_with(|id| *id)
104
0
            .unwrap_or_else(|_| thread::current().id())
Unexecuted instantiation: async_task::runnable::spawn_local::thread_id::{closure#1}
Unexecuted instantiation: async_task::runnable::spawn_local::thread_id::{closure#1}
105
0
    }
Unexecuted instantiation: async_task::runnable::spawn_local::thread_id
Unexecuted instantiation: async_task::runnable::spawn_local::thread_id
106
107
    struct Checked<F> {
108
        id: ThreadId,
109
        inner: ManuallyDrop<F>,
110
    }
111
112
    impl<F> Drop for Checked<F> {
113
0
        fn drop(&mut self) {
114
0
            assert!(
115
0
                self.id == thread_id(),
116
                "local task dropped by a thread that didn't spawn it"
117
            );
118
0
            unsafe {
119
0
                ManuallyDrop::drop(&mut self.inner);
120
0
            }
121
0
        }
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::ops::drop::Drop>::drop
122
    }
123
124
    impl<F: Future> Future for Checked<F> {
125
        type Output = F::Output;
126
127
0
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128
0
            assert!(
129
0
                self.id == thread_id(),
130
                "local task polled by a thread that didn't spawn it"
131
            );
132
0
            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::future::future::Future>::poll::{closure#0}
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::future::future::Future>::poll::{closure#0}
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::future::future::Future>::poll::{closure#0}
133
0
        }
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::future::future::Future>::poll
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::future::future::Future>::poll
Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::future::future::Future>::poll
134
    }
135
136
    // Wrap the future into one that checks which thread it's on.
137
0
    let future = Checked {
138
0
        id: thread_id(),
139
0
        inner: ManuallyDrop::new(future),
140
0
    };
141
142
0
    unsafe { spawn_unchecked(future, schedule) }
143
0
}
Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_local::<_, _>
144
145
/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
146
///
147
/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
148
/// `'static` on `future` and `schedule`.
149
///
150
/// # Safety
151
///
152
/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
153
///   thread.
154
/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
155
/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on
156
///   the original thread.
157
/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`].
158
///
159
/// # Examples
160
///
161
/// ```
162
/// // The future inside the task.
163
/// let future = async {
164
///     println!("Hello, world!");
165
/// };
166
///
167
/// // If the task gets woken up, it will be sent into this channel.
168
/// let (s, r) = flume::unbounded();
169
/// let schedule = move |runnable| s.send(runnable).unwrap();
170
///
171
/// // Create a task with the future and the schedule function.
172
/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
173
/// ```
174
0
pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
175
0
where
176
0
    F: Future,
177
0
    S: Fn(Runnable),
178
{
179
    // Allocate large futures on the heap.
180
0
    let ptr = if mem::size_of::<F>() >= 2048 {
181
0
        let future = alloc::boxed::Box::pin(future);
182
0
        RawTask::<_, F::Output, S>::allocate(future, schedule)
183
    } else {
184
0
        RawTask::<F, F::Output, S>::allocate(future, schedule)
185
    };
186
187
0
    let runnable = Runnable { ptr };
188
0
    let task = Task {
189
0
        ptr,
190
0
        _marker: PhantomData,
191
0
    };
192
0
    (runnable, task)
193
0
}
Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>
Unexecuted instantiation: async_task::runnable::spawn_unchecked::<_, _>
194
195
/// A handle to a runnable task.
196
///
197
/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
198
/// scheduled for running.
199
///
200
/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
201
/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
202
/// again.
203
///
204
/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
205
/// awaiting the [`Task`] after that will result in a panic.
206
///
207
/// # Examples
208
///
209
/// ```
210
/// use async_task::Runnable;
211
/// use once_cell::sync::Lazy;
212
/// use std::{panic, thread};
213
///
214
/// // A simple executor.
215
/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
216
///     let (sender, receiver) = flume::unbounded::<Runnable>();
217
///     thread::spawn(|| {
218
///         for runnable in receiver {
219
///             let _ignore_panic = panic::catch_unwind(|| runnable.run());
220
///         }
221
///     });
222
///     sender
223
/// });
224
///
225
/// // Create a task with a simple future.
226
/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
227
/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
228
///
229
/// // Schedule the task and await its output.
230
/// runnable.schedule();
231
/// assert_eq!(smol::future::block_on(task), 3);
232
/// ```
233
pub struct Runnable {
234
    /// A pointer to the heap-allocated task.
235
    pub(crate) ptr: NonNull<()>,
236
}
237
238
unsafe impl Send for Runnable {}
239
unsafe impl Sync for Runnable {}
240
241
#[cfg(feature = "std")]
242
impl std::panic::UnwindSafe for Runnable {}
243
#[cfg(feature = "std")]
244
impl std::panic::RefUnwindSafe for Runnable {}
245
246
impl Runnable {
247
    /// Schedules the task.
248
    ///
249
    /// This is a convenience method that passes the [`Runnable`] to the schedule function.
250
    ///
251
    /// # Examples
252
    ///
253
    /// ```
254
    /// // A function that schedules the task when it gets woken up.
255
    /// let (s, r) = flume::unbounded();
256
    /// let schedule = move |runnable| s.send(runnable).unwrap();
257
    ///
258
    /// // Create a task with a simple future and the schedule function.
259
    /// let (runnable, task) = async_task::spawn(async {}, schedule);
260
    ///
261
    /// // Schedule the task.
262
    /// assert_eq!(r.len(), 0);
263
    /// runnable.schedule();
264
    /// assert_eq!(r.len(), 1);
265
    /// ```
266
0
    pub fn schedule(self) {
267
0
        let ptr = self.ptr.as_ptr();
268
0
        let header = ptr as *const Header;
269
0
        mem::forget(self);
270
271
0
        unsafe {
272
0
            ((*header).vtable.schedule)(ptr);
273
0
        }
274
0
    }
275
276
    /// Runs the task by polling its future.
277
    ///
278
    /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
279
    /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
280
    /// [`Runnable`] vanishes until the task is woken.
281
    /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
282
    /// it woke itself and then gave the control back to the executor.
283
    ///
284
    /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
285
    /// this method simply destroys the task.
286
    ///
287
    /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
288
    /// after that will also result in a panic.
289
    ///
290
    /// # Examples
291
    ///
292
    /// ```
293
    /// // A function that schedules the task when it gets woken up.
294
    /// let (s, r) = flume::unbounded();
295
    /// let schedule = move |runnable| s.send(runnable).unwrap();
296
    ///
297
    /// // Create a task with a simple future and the schedule function.
298
    /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
299
    ///
300
    /// // Run the task and check its output.
301
    /// runnable.run();
302
    /// assert_eq!(smol::future::block_on(task), 3);
303
    /// ```
304
0
    pub fn run(self) -> bool {
305
0
        let ptr = self.ptr.as_ptr();
306
0
        let header = ptr as *const Header;
307
0
        mem::forget(self);
308
309
0
        unsafe { ((*header).vtable.run)(ptr) }
310
0
    }
311
312
    /// Returns a waker associated with this task.
313
    ///
314
    /// # Examples
315
    ///
316
    /// ```
317
    /// use smol::future;
318
    ///
319
    /// // A function that schedules the task when it gets woken up.
320
    /// let (s, r) = flume::unbounded();
321
    /// let schedule = move |runnable| s.send(runnable).unwrap();
322
    ///
323
    /// // Create a task with a simple future and the schedule function.
324
    /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
325
    ///
326
    /// // Take a waker and run the task.
327
    /// let waker = runnable.waker();
328
    /// runnable.run();
329
    ///
330
    /// // Reschedule the task by waking it.
331
    /// assert_eq!(r.len(), 0);
332
    /// waker.wake();
333
    /// assert_eq!(r.len(), 1);
334
    /// ```
335
0
    pub fn waker(&self) -> Waker {
336
0
        let ptr = self.ptr.as_ptr();
337
0
        let header = ptr as *const Header;
338
339
        unsafe {
340
0
            let raw_waker = ((*header).vtable.clone_waker)(ptr);
341
0
            Waker::from_raw(raw_waker)
342
        }
343
0
    }
344
}
345
346
impl Drop for Runnable {
347
0
    fn drop(&mut self) {
348
0
        let ptr = self.ptr.as_ptr();
349
0
        let header = ptr as *const Header;
350
351
        unsafe {
352
0
            let mut state = (*header).state.load(Ordering::Acquire);
353
354
            loop {
355
                // If the task has been completed or closed, it can't be canceled.
356
0
                if state & (COMPLETED | CLOSED) != 0 {
357
0
                    break;
358
0
                }
359
360
                // Mark the task as closed.
361
0
                match (*header).state.compare_exchange_weak(
362
0
                    state,
363
0
                    state | CLOSED,
364
0
                    Ordering::AcqRel,
365
0
                    Ordering::Acquire,
366
0
                ) {
367
0
                    Ok(_) => break,
368
0
                    Err(s) => state = s,
369
                }
370
            }
371
372
            // Drop the future.
373
0
            ((*header).vtable.drop_future)(ptr);
374
375
            // Mark the task as unscheduled.
376
0
            let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
377
378
            // Notify the awaiter that the future has been dropped.
379
0
            if state & AWAITER != 0 {
380
0
                (*header).notify(None);
381
0
            }
382
383
            // Drop the task reference.
384
0
            ((*header).vtable.drop_ref)(ptr);
385
        }
386
0
    }
387
}
388
389
impl fmt::Debug for Runnable {
390
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391
0
        let ptr = self.ptr.as_ptr();
392
0
        let header = ptr as *const Header;
393
394
0
        f.debug_struct("Runnable")
395
0
            .field("header", unsafe { &(*header) })
396
0
            .finish()
397
0
    }
398
}