Coverage Report

Created: 2025-11-16 06:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-task-4.3.0/src/task.rs
Line
Count
Source
1
use core::fmt;
2
use core::future::Future;
3
use core::marker::{PhantomData, Unpin};
4
use core::mem;
5
use core::pin::Pin;
6
use core::ptr::NonNull;
7
use core::sync::atomic::Ordering;
8
use core::task::{Context, Poll};
9
10
use crate::header::Header;
11
use crate::state::*;
12
13
/// A spawned task.
14
///
15
/// A [`Task`] can be awaited to retrieve the output of its future.
16
///
17
/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
18
/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
19
/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
20
/// method.
21
///
22
/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
23
/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
24
/// [`run()`][`super::Runnable::run()`].
25
///
26
/// # Examples
27
///
28
/// ```
29
/// use smol::{future, Executor};
30
/// use std::thread;
31
///
32
/// let ex = Executor::new();
33
///
34
/// // Spawn a future onto the executor.
35
/// let task = ex.spawn(async {
36
///     println!("Hello from a task!");
37
///     1 + 2
38
/// });
39
///
40
/// // Run an executor thread.
41
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
42
///
43
/// // Wait for the task's output.
44
/// assert_eq!(future::block_on(task), 3);
45
/// ```
46
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
47
pub struct Task<T> {
48
    /// A raw task pointer.
49
    pub(crate) ptr: NonNull<()>,
50
51
    /// A marker capturing generic type `T`.
52
    pub(crate) _marker: PhantomData<T>,
53
}
54
55
unsafe impl<T: Send> Send for Task<T> {}
56
unsafe impl<T> Sync for Task<T> {}
57
58
impl<T> Unpin for Task<T> {}
59
60
#[cfg(feature = "std")]
61
impl<T> std::panic::UnwindSafe for Task<T> {}
62
#[cfg(feature = "std")]
63
impl<T> std::panic::RefUnwindSafe for Task<T> {}
64
65
impl<T> Task<T> {
66
    /// Detaches the task to let it keep running in the background.
67
    ///
68
    /// # Examples
69
    ///
70
    /// ```
71
    /// use smol::{Executor, Timer};
72
    /// use std::time::Duration;
73
    ///
74
    /// let ex = Executor::new();
75
    ///
76
    /// // Spawn a deamon future.
77
    /// ex.spawn(async {
78
    ///     loop {
79
    ///         println!("I'm a daemon task looping forever.");
80
    ///         Timer::after(Duration::from_secs(1)).await;
81
    ///     }
82
    /// })
83
    /// .detach();
84
    /// ```
85
0
    pub fn detach(self) {
86
0
        let mut this = self;
87
0
        let _out = this.set_detached();
88
0
        mem::forget(this);
89
0
    }
90
91
    /// Cancels the task and waits for it to stop running.
92
    ///
93
    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
94
    /// it didn't complete.
95
    ///
96
    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
97
    /// canceling because it also waits for the task to stop running.
98
    ///
99
    /// # Examples
100
    ///
101
    /// ```
102
    /// # if cfg!(miri) { return; } // Miri does not support epoll
103
    /// use smol::{future, Executor, Timer};
104
    /// use std::thread;
105
    /// use std::time::Duration;
106
    ///
107
    /// let ex = Executor::new();
108
    ///
109
    /// // Spawn a deamon future.
110
    /// let task = ex.spawn(async {
111
    ///     loop {
112
    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
113
    ///         Timer::after(Duration::from_secs(1)).await;
114
    ///     }
115
    /// });
116
    ///
117
    /// // Run an executor thread.
118
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
119
    ///
120
    /// future::block_on(async {
121
    ///     Timer::after(Duration::from_secs(3)).await;
122
    ///     task.cancel().await;
123
    /// });
124
    /// ```
125
0
    pub async fn cancel(self) -> Option<T> {
Unexecuted instantiation: <async_task::task::Task<()>>::cancel
Unexecuted instantiation: <async_task::task::Task<_>>::cancel
126
0
        let mut this = self;
127
0
        this.set_canceled();
128
0
        this.fallible().await
129
0
    }
Unexecuted instantiation: <async_task::task::Task<()>>::cancel::{closure#0}
Unexecuted instantiation: <async_task::task::Task<_>>::cancel::{closure#0}
130
131
    /// Converts this task into a [`FallibleTask`].
132
    ///
133
    /// Like [`Task`], a fallible task will poll the task's output until it is
134
    /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
135
    /// dropped without being run. Resolves to the task's output when completed,
136
    /// or [`None`] if it didn't complete.
137
    ///
138
    /// # Examples
139
    ///
140
    /// ```
141
    /// use smol::{future, Executor};
142
    /// use std::thread;
143
    ///
144
    /// let ex = Executor::new();
145
    ///
146
    /// // Spawn a future onto the executor.
147
    /// let task = ex.spawn(async {
148
    ///     println!("Hello from a task!");
149
    ///     1 + 2
150
    /// })
151
    /// .fallible();
152
    ///
153
    /// // Run an executor thread.
154
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
155
    ///
156
    /// // Wait for the task's output.
157
    /// assert_eq!(future::block_on(task), Some(3));
158
    /// ```
159
    ///
160
    /// ```
161
    /// use smol::future;
162
    ///
163
    /// // Schedule function which drops the runnable without running it.
164
    /// let schedule = move |runnable| drop(runnable);
165
    ///
166
    /// // Create a task with the future and the schedule function.
167
    /// let (runnable, task) = async_task::spawn(async {
168
    ///     println!("Hello from a task!");
169
    ///     1 + 2
170
    /// }, schedule);
171
    /// runnable.schedule();
172
    ///
173
    /// // Wait for the task's output.
174
    /// assert_eq!(future::block_on(task.fallible()), None);
175
    /// ```
176
0
    pub fn fallible(self) -> FallibleTask<T> {
177
0
        FallibleTask { task: self }
178
0
    }
Unexecuted instantiation: <async_task::task::Task<()>>::fallible
Unexecuted instantiation: <async_task::task::Task<_>>::fallible
179
180
    /// Puts the task in canceled state.
181
0
    fn set_canceled(&mut self) {
182
0
        let ptr = self.ptr.as_ptr();
183
0
        let header = ptr as *const Header;
184
185
        unsafe {
186
0
            let mut state = (*header).state.load(Ordering::Acquire);
187
188
            loop {
189
                // If the task has been completed or closed, it can't be canceled.
190
0
                if state & (COMPLETED | CLOSED) != 0 {
191
0
                    break;
192
0
                }
193
194
                // If the task is not scheduled nor running, we'll need to schedule it.
195
0
                let new = if state & (SCHEDULED | RUNNING) == 0 {
196
0
                    (state | SCHEDULED | CLOSED) + REFERENCE
197
                } else {
198
0
                    state | CLOSED
199
                };
200
201
                // Mark the task as closed.
202
0
                match (*header).state.compare_exchange_weak(
203
0
                    state,
204
0
                    new,
205
0
                    Ordering::AcqRel,
206
0
                    Ordering::Acquire,
207
0
                ) {
208
                    Ok(_) => {
209
                        // If the task is not scheduled nor running, schedule it one more time so
210
                        // that its future gets dropped by the executor.
211
0
                        if state & (SCHEDULED | RUNNING) == 0 {
212
0
                            ((*header).vtable.schedule)(ptr);
213
0
                        }
214
215
                        // Notify the awaiter that the task has been closed.
216
0
                        if state & AWAITER != 0 {
217
0
                            (*header).notify(None);
218
0
                        }
219
220
0
                        break;
221
                    }
222
0
                    Err(s) => state = s,
223
                }
224
            }
225
        }
226
0
    }
Unexecuted instantiation: <async_task::task::Task<()>>::set_canceled
Unexecuted instantiation: <async_task::task::Task<_>>::set_canceled
227
228
    /// Puts the task in detached state.
229
0
    fn set_detached(&mut self) -> Option<T> {
230
0
        let ptr = self.ptr.as_ptr();
231
0
        let header = ptr as *const Header;
232
233
        unsafe {
234
            // A place where the output will be stored in case it needs to be dropped.
235
0
            let mut output = None;
236
237
            // Optimistically assume the `Task` is being detached just after creating the task.
238
            // This is a common case so if the `Task` is datached, the overhead of it is only one
239
            // compare-exchange operation.
240
0
            if let Err(mut state) = (*header).state.compare_exchange_weak(
241
0
                SCHEDULED | TASK | REFERENCE,
242
0
                SCHEDULED | REFERENCE,
243
0
                Ordering::AcqRel,
244
0
                Ordering::Acquire,
245
0
            ) {
246
                loop {
247
                    // If the task has been completed but not yet closed, that means its output
248
                    // must be dropped.
249
0
                    if state & COMPLETED != 0 && state & CLOSED == 0 {
250
                        // Mark the task as closed in order to grab its output.
251
0
                        match (*header).state.compare_exchange_weak(
252
0
                            state,
253
0
                            state | CLOSED,
254
0
                            Ordering::AcqRel,
255
0
                            Ordering::Acquire,
256
0
                        ) {
257
0
                            Ok(_) => {
258
0
                                // Read the output.
259
0
                                output =
260
0
                                    Some((((*header).vtable.get_output)(ptr) as *mut T).read());
261
0
262
0
                                // Update the state variable because we're continuing the loop.
263
0
                                state |= CLOSED;
264
0
                            }
265
0
                            Err(s) => state = s,
266
                        }
267
                    } else {
268
                        // If this is the last reference to the task and it's not closed, then
269
                        // close it and schedule one more time so that its future gets dropped by
270
                        // the executor.
271
0
                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
272
0
                            SCHEDULED | CLOSED | REFERENCE
273
                        } else {
274
0
                            state & !TASK
275
                        };
276
277
                        // Unset the `TASK` flag.
278
0
                        match (*header).state.compare_exchange_weak(
279
0
                            state,
280
0
                            new,
281
0
                            Ordering::AcqRel,
282
0
                            Ordering::Acquire,
283
0
                        ) {
284
                            Ok(_) => {
285
                                // If this is the last reference to the task, we need to either
286
                                // schedule dropping its future or destroy it.
287
0
                                if state & !(REFERENCE - 1) == 0 {
288
0
                                    if state & CLOSED == 0 {
289
0
                                        ((*header).vtable.schedule)(ptr);
290
0
                                    } else {
291
0
                                        ((*header).vtable.destroy)(ptr);
292
0
                                    }
293
0
                                }
294
295
0
                                break;
296
                            }
297
0
                            Err(s) => state = s,
298
                        }
299
                    }
300
                }
301
0
            }
302
303
0
            output
304
        }
305
0
    }
Unexecuted instantiation: <async_task::task::Task<()>>::set_detached
Unexecuted instantiation: <async_task::task::Task<_>>::set_detached
306
307
    /// Polls the task to retrieve its output.
308
    ///
309
    /// Returns `Some` if the task has completed or `None` if it was closed.
310
    ///
311
    /// A task becomes closed in the following cases:
312
    ///
313
    /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
314
    /// 2. Its output gets awaited by the `Task`.
315
    /// 3. It panics while polling the future.
316
    /// 4. It is completed and the `Task` gets dropped.
317
0
    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
318
0
        let ptr = self.ptr.as_ptr();
319
0
        let header = ptr as *const Header;
320
321
        unsafe {
322
0
            let mut state = (*header).state.load(Ordering::Acquire);
323
324
            loop {
325
                // If the task has been closed, notify the awaiter and return `None`.
326
0
                if state & CLOSED != 0 {
327
                    // If the task is scheduled or running, we need to wait until its future is
328
                    // dropped.
329
0
                    if state & (SCHEDULED | RUNNING) != 0 {
330
                        // Replace the waker with one associated with the current task.
331
0
                        (*header).register(cx.waker());
332
333
                        // Reload the state after registering. It is possible changes occurred just
334
                        // before registration so we need to check for that.
335
0
                        state = (*header).state.load(Ordering::Acquire);
336
337
                        // If the task is still scheduled or running, we need to wait because its
338
                        // future is not dropped yet.
339
0
                        if state & (SCHEDULED | RUNNING) != 0 {
340
0
                            return Poll::Pending;
341
0
                        }
342
0
                    }
343
344
                    // Even though the awaiter is most likely the current task, it could also be
345
                    // another task.
346
0
                    (*header).notify(Some(cx.waker()));
347
0
                    return Poll::Ready(None);
348
0
                }
349
350
                // If the task is not completed, register the current task.
351
0
                if state & COMPLETED == 0 {
352
                    // Replace the waker with one associated with the current task.
353
0
                    (*header).register(cx.waker());
354
355
                    // Reload the state after registering. It is possible that the task became
356
                    // completed or closed just before registration so we need to check for that.
357
0
                    state = (*header).state.load(Ordering::Acquire);
358
359
                    // If the task has been closed, restart.
360
0
                    if state & CLOSED != 0 {
361
0
                        continue;
362
0
                    }
363
364
                    // If the task is still not completed, we're blocked on it.
365
0
                    if state & COMPLETED == 0 {
366
0
                        return Poll::Pending;
367
0
                    }
368
0
                }
369
370
                // Since the task is now completed, mark it as closed in order to grab its output.
371
0
                match (*header).state.compare_exchange(
372
0
                    state,
373
0
                    state | CLOSED,
374
0
                    Ordering::AcqRel,
375
0
                    Ordering::Acquire,
376
0
                ) {
377
                    Ok(_) => {
378
                        // Notify the awaiter. Even though the awaiter is most likely the current
379
                        // task, it could also be another task.
380
0
                        if state & AWAITER != 0 {
381
0
                            (*header).notify(Some(cx.waker()));
382
0
                        }
383
384
                        // Take the output from the task.
385
0
                        let output = ((*header).vtable.get_output)(ptr) as *mut T;
386
0
                        return Poll::Ready(Some(output.read()));
387
                    }
388
0
                    Err(s) => state = s,
389
                }
390
            }
391
        }
392
0
    }
Unexecuted instantiation: <async_task::task::Task<()>>::poll_task
Unexecuted instantiation: <async_task::task::Task<_>>::poll_task
393
394
0
    fn header(&self) -> &Header {
395
0
        let ptr = self.ptr.as_ptr();
396
0
        let header = ptr as *const Header;
397
0
        unsafe { &*header }
398
0
    }
399
400
    /// Returns `true` if the current task is finished.
401
    ///
402
    /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
403
0
    pub fn is_finished(&self) -> bool {
404
0
        let ptr = self.ptr.as_ptr();
405
0
        let header = ptr as *const Header;
406
407
        unsafe {
408
0
            let state = (*header).state.load(Ordering::Acquire);
409
0
            state & (CLOSED | COMPLETED) != 0
410
        }
411
0
    }
412
}
413
414
impl<T> Drop for Task<T> {
415
0
    fn drop(&mut self) {
416
0
        self.set_canceled();
417
0
        self.set_detached();
418
0
    }
Unexecuted instantiation: <async_task::task::Task<()> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_task::task::Task<_> as core::ops::drop::Drop>::drop
419
}
420
421
impl<T> Future for Task<T> {
422
    type Output = T;
423
424
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425
0
        match self.poll_task(cx) {
426
0
            Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
427
0
            Poll::Pending => Poll::Pending,
428
        }
429
0
    }
430
}
431
432
impl<T> fmt::Debug for Task<T> {
433
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434
0
        f.debug_struct("Task")
435
0
            .field("header", self.header())
436
0
            .finish()
437
0
    }
438
}
439
440
/// A spawned task with a fallible response.
441
///
442
/// This type behaves like [`Task`], however it produces an `Option<T>` when
443
/// polled and will return `None` if the executor dropped its
444
/// [`Runnable`][`super::Runnable`] without being run.
445
///
446
/// This can be useful to avoid the panic produced when polling the `Task`
447
/// future if the executor dropped its `Runnable`.
448
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
449
pub struct FallibleTask<T> {
450
    task: Task<T>,
451
}
452
453
impl<T> FallibleTask<T> {
454
    /// Detaches the task to let it keep running in the background.
455
    ///
456
    /// # Examples
457
    ///
458
    /// ```
459
    /// use smol::{Executor, Timer};
460
    /// use std::time::Duration;
461
    ///
462
    /// let ex = Executor::new();
463
    ///
464
    /// // Spawn a deamon future.
465
    /// ex.spawn(async {
466
    ///     loop {
467
    ///         println!("I'm a daemon task looping forever.");
468
    ///         Timer::after(Duration::from_secs(1)).await;
469
    ///     }
470
    /// })
471
    /// .fallible()
472
    /// .detach();
473
    /// ```
474
0
    pub fn detach(self) {
475
0
        self.task.detach()
476
0
    }
477
478
    /// Cancels the task and waits for it to stop running.
479
    ///
480
    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
481
    /// it didn't complete.
482
    ///
483
    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
484
    /// canceling because it also waits for the task to stop running.
485
    ///
486
    /// # Examples
487
    ///
488
    /// ```
489
    /// # if cfg!(miri) { return; } // Miri does not support epoll
490
    /// use smol::{future, Executor, Timer};
491
    /// use std::thread;
492
    /// use std::time::Duration;
493
    ///
494
    /// let ex = Executor::new();
495
    ///
496
    /// // Spawn a deamon future.
497
    /// let task = ex.spawn(async {
498
    ///     loop {
499
    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
500
    ///         Timer::after(Duration::from_secs(1)).await;
501
    ///     }
502
    /// })
503
    /// .fallible();
504
    ///
505
    /// // Run an executor thread.
506
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
507
    ///
508
    /// future::block_on(async {
509
    ///     Timer::after(Duration::from_secs(3)).await;
510
    ///     task.cancel().await;
511
    /// });
512
    /// ```
513
0
    pub async fn cancel(self) -> Option<T> {
514
0
        self.task.cancel().await
515
0
    }
516
}
517
518
impl<T> Future for FallibleTask<T> {
519
    type Output = Option<T>;
520
521
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522
0
        self.task.poll_task(cx)
523
0
    }
Unexecuted instantiation: <async_task::task::FallibleTask<()> as core::future::future::Future>::poll
Unexecuted instantiation: <async_task::task::FallibleTask<_> as core::future::future::Future>::poll
524
}
525
526
impl<T> fmt::Debug for FallibleTask<T> {
527
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528
0
        f.debug_struct("FallibleTask")
529
0
            .field("header", self.task.header())
530
0
            .finish()
531
0
    }
532
}