Coverage Report

Created: 2025-02-21 07:11

/rust/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.7.1/src/task.rs
Line
Count
Source (jump to first uncovered line)
1
use core::fmt;
2
use core::future::Future;
3
use core::marker::PhantomData;
4
use core::mem;
5
use core::pin::Pin;
6
use core::ptr::NonNull;
7
use core::sync::atomic::Ordering;
8
use core::task::{Context, Poll};
9
10
use crate::header::Header;
11
use crate::raw::Panic;
12
use crate::runnable::ScheduleInfo;
13
use crate::state::*;
14
15
/// A spawned task.
16
///
17
/// A [`Task`] can be awaited to retrieve the output of its future.
18
///
19
/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
20
/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
21
/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
22
/// method.
23
///
24
/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
25
/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
26
/// [`run()`][`super::Runnable::run()`].
27
///
28
/// # Examples
29
///
30
/// ```
31
/// use smol::{future, Executor};
32
/// use std::thread;
33
///
34
/// let ex = Executor::new();
35
///
36
/// // Spawn a future onto the executor.
37
/// let task = ex.spawn(async {
38
///     println!("Hello from a task!");
39
///     1 + 2
40
/// });
41
///
42
/// // Run an executor thread.
43
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
44
///
45
/// // Wait for the task's output.
46
/// assert_eq!(future::block_on(task), 3);
47
/// ```
48
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
49
pub struct Task<T, M = ()> {
50
    /// A raw task pointer.
51
    pub(crate) ptr: NonNull<()>,
52
53
    /// A marker capturing generic types `T` and `M`.
54
    pub(crate) _marker: PhantomData<(T, M)>,
55
}
56
57
unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {}
58
unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {}
59
60
impl<T, M> Unpin for Task<T, M> {}
61
62
#[cfg(feature = "std")]
63
impl<T, M> std::panic::UnwindSafe for Task<T, M> {}
64
#[cfg(feature = "std")]
65
impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {}
66
67
impl<T, M> Task<T, M> {
68
    /// Detaches the task to let it keep running in the background.
69
    ///
70
    /// # Examples
71
    ///
72
    /// ```
73
    /// use smol::{Executor, Timer};
74
    /// use std::time::Duration;
75
    ///
76
    /// let ex = Executor::new();
77
    ///
78
    /// // Spawn a deamon future.
79
    /// ex.spawn(async {
80
    ///     loop {
81
    ///         println!("I'm a daemon task looping forever.");
82
    ///         Timer::after(Duration::from_secs(1)).await;
83
    ///     }
84
    /// })
85
    /// .detach();
86
    /// ```
87
0
    pub fn detach(self) {
88
0
        let mut this = self;
89
0
        let _out = this.set_detached();
90
0
        mem::forget(this);
91
0
    }
92
93
    /// Cancels the task and waits for it to stop running.
94
    ///
95
    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
96
    /// it didn't complete.
97
    ///
98
    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
99
    /// canceling because it also waits for the task to stop running.
100
    ///
101
    /// # Examples
102
    ///
103
    /// ```
104
    /// # if cfg!(miri) { return; } // Miri does not support epoll
105
    /// use smol::{future, Executor, Timer};
106
    /// use std::thread;
107
    /// use std::time::Duration;
108
    ///
109
    /// let ex = Executor::new();
110
    ///
111
    /// // Spawn a deamon future.
112
    /// let task = ex.spawn(async {
113
    ///     loop {
114
    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
115
    ///         Timer::after(Duration::from_secs(1)).await;
116
    ///     }
117
    /// });
118
    ///
119
    /// // Run an executor thread.
120
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
121
    ///
122
    /// future::block_on(async {
123
    ///     Timer::after(Duration::from_secs(3)).await;
124
    ///     task.cancel().await;
125
    /// });
126
    /// ```
127
0
    pub async fn cancel(self) -> Option<T> {
128
0
        let mut this = self;
129
0
        this.set_canceled();
130
0
        this.fallible().await
131
0
    }
132
133
    /// Converts this task into a [`FallibleTask`].
134
    ///
135
    /// Like [`Task`], a fallible task will poll the task's output until it is
136
    /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
137
    /// dropped without being run. Resolves to the task's output when completed,
138
    /// or [`None`] if it didn't complete.
139
    ///
140
    /// # Examples
141
    ///
142
    /// ```
143
    /// use smol::{future, Executor};
144
    /// use std::thread;
145
    ///
146
    /// let ex = Executor::new();
147
    ///
148
    /// // Spawn a future onto the executor.
149
    /// let task = ex.spawn(async {
150
    ///     println!("Hello from a task!");
151
    ///     1 + 2
152
    /// })
153
    /// .fallible();
154
    ///
155
    /// // Run an executor thread.
156
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
157
    ///
158
    /// // Wait for the task's output.
159
    /// assert_eq!(future::block_on(task), Some(3));
160
    /// ```
161
    ///
162
    /// ```
163
    /// use smol::future;
164
    ///
165
    /// // Schedule function which drops the runnable without running it.
166
    /// let schedule = move |runnable| drop(runnable);
167
    ///
168
    /// // Create a task with the future and the schedule function.
169
    /// let (runnable, task) = async_task::spawn(async {
170
    ///     println!("Hello from a task!");
171
    ///     1 + 2
172
    /// }, schedule);
173
    /// runnable.schedule();
174
    ///
175
    /// // Wait for the task's output.
176
    /// assert_eq!(future::block_on(task.fallible()), None);
177
    /// ```
178
0
    pub fn fallible(self) -> FallibleTask<T, M> {
179
0
        FallibleTask { task: self }
180
0
    }
181
182
    /// Puts the task in canceled state.
183
0
    fn set_canceled(&mut self) {
184
0
        let ptr = self.ptr.as_ptr();
185
0
        let header = ptr as *const Header<M>;
186
0
187
0
        unsafe {
188
0
            let mut state = (*header).state.load(Ordering::Acquire);
189
190
            loop {
191
                // If the task has been completed or closed, it can't be canceled.
192
0
                if state & (COMPLETED | CLOSED) != 0 {
193
0
                    break;
194
0
                }
195
196
                // If the task is not scheduled nor running, we'll need to schedule it.
197
0
                let new = if state & (SCHEDULED | RUNNING) == 0 {
198
0
                    (state | SCHEDULED | CLOSED) + REFERENCE
199
                } else {
200
0
                    state | CLOSED
201
                };
202
203
                // Mark the task as closed.
204
0
                match (*header).state.compare_exchange_weak(
205
0
                    state,
206
0
                    new,
207
0
                    Ordering::AcqRel,
208
0
                    Ordering::Acquire,
209
0
                ) {
210
                    Ok(_) => {
211
                        // If the task is not scheduled nor running, schedule it one more time so
212
                        // that its future gets dropped by the executor.
213
0
                        if state & (SCHEDULED | RUNNING) == 0 {
214
0
                            ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
215
0
                        }
216
217
                        // Notify the awaiter that the task has been closed.
218
0
                        if state & AWAITER != 0 {
219
0
                            (*header).notify(None);
220
0
                        }
221
222
0
                        break;
223
                    }
224
0
                    Err(s) => state = s,
225
                }
226
            }
227
        }
228
0
    }
Unexecuted instantiation: <async_task::task::Task<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>>>::set_canceled
Unexecuted instantiation: <async_task::task::Task<_, _>>::set_canceled
229
230
    /// Puts the task in detached state.
231
0
    fn set_detached(&mut self) -> Option<Result<T, Panic>> {
232
0
        let ptr = self.ptr.as_ptr();
233
0
        let header = ptr as *const Header<M>;
234
0
235
0
        unsafe {
236
0
            // A place where the output will be stored in case it needs to be dropped.
237
0
            let mut output = None;
238
239
            // Optimistically assume the `Task` is being detached just after creating the task.
240
            // This is a common case so if the `Task` is datached, the overhead of it is only one
241
            // compare-exchange operation.
242
0
            if let Err(mut state) = (*header).state.compare_exchange_weak(
243
0
                SCHEDULED | TASK | REFERENCE,
244
0
                SCHEDULED | REFERENCE,
245
0
                Ordering::AcqRel,
246
0
                Ordering::Acquire,
247
0
            ) {
248
                loop {
249
                    // If the task has been completed but not yet closed, that means its output
250
                    // must be dropped.
251
0
                    if state & COMPLETED != 0 && state & CLOSED == 0 {
252
                        // Mark the task as closed in order to grab its output.
253
0
                        match (*header).state.compare_exchange_weak(
254
0
                            state,
255
0
                            state | CLOSED,
256
0
                            Ordering::AcqRel,
257
0
                            Ordering::Acquire,
258
0
                        ) {
259
0
                            Ok(_) => {
260
0
                                // Read the output.
261
0
                                output = Some(
262
0
                                    (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
263
0
                                        .read(),
264
0
                                );
265
0
266
0
                                // Update the state variable because we're continuing the loop.
267
0
                                state |= CLOSED;
268
0
                            }
269
0
                            Err(s) => state = s,
270
                        }
271
                    } else {
272
                        // If this is the last reference to the task and it's not closed, then
273
                        // close it and schedule one more time so that its future gets dropped by
274
                        // the executor.
275
0
                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
276
0
                            SCHEDULED | CLOSED | REFERENCE
277
                        } else {
278
0
                            state & !TASK
279
                        };
280
281
                        // Unset the `TASK` flag.
282
0
                        match (*header).state.compare_exchange_weak(
283
0
                            state,
284
0
                            new,
285
0
                            Ordering::AcqRel,
286
0
                            Ordering::Acquire,
287
0
                        ) {
288
                            Ok(_) => {
289
                                // If this is the last reference to the task, we need to either
290
                                // schedule dropping its future or destroy it.
291
0
                                if state & !(REFERENCE - 1) == 0 {
292
0
                                    if state & CLOSED == 0 {
293
0
                                        ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
294
0
                                    } else {
295
0
                                        ((*header).vtable.destroy)(ptr);
296
0
                                    }
297
0
                                }
298
299
0
                                break;
300
                            }
301
0
                            Err(s) => state = s,
302
                        }
303
                    }
304
                }
305
0
            }
306
307
0
            output
308
0
        }
309
0
    }
Unexecuted instantiation: <async_task::task::Task<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>>>::set_detached
Unexecuted instantiation: <async_task::task::Task<_, _>>::set_detached
310
311
    /// Polls the task to retrieve its output.
312
    ///
313
    /// Returns `Some` if the task has completed or `None` if it was closed.
314
    ///
315
    /// A task becomes closed in the following cases:
316
    ///
317
    /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
318
    /// 2. Its output gets awaited by the `Task`.
319
    /// 3. It panics while polling the future.
320
    /// 4. It is completed and the `Task` gets dropped.
321
0
    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
322
0
        let ptr = self.ptr.as_ptr();
323
0
        let header = ptr as *const Header<M>;
324
0
325
0
        unsafe {
326
0
            let mut state = (*header).state.load(Ordering::Acquire);
327
328
            loop {
329
                // If the task has been closed, notify the awaiter and return `None`.
330
0
                if state & CLOSED != 0 {
331
                    // If the task is scheduled or running, we need to wait until its future is
332
                    // dropped.
333
0
                    if state & (SCHEDULED | RUNNING) != 0 {
334
                        // Replace the waker with one associated with the current task.
335
0
                        (*header).register(cx.waker());
336
0
337
0
                        // Reload the state after registering. It is possible changes occurred just
338
0
                        // before registration so we need to check for that.
339
0
                        state = (*header).state.load(Ordering::Acquire);
340
0
341
0
                        // If the task is still scheduled or running, we need to wait because its
342
0
                        // future is not dropped yet.
343
0
                        if state & (SCHEDULED | RUNNING) != 0 {
344
0
                            return Poll::Pending;
345
0
                        }
346
0
                    }
347
348
                    // Even though the awaiter is most likely the current task, it could also be
349
                    // another task.
350
0
                    (*header).notify(Some(cx.waker()));
351
0
                    return Poll::Ready(None);
352
0
                }
353
0
354
0
                // If the task is not completed, register the current task.
355
0
                if state & COMPLETED == 0 {
356
                    // Replace the waker with one associated with the current task.
357
0
                    (*header).register(cx.waker());
358
0
359
0
                    // Reload the state after registering. It is possible that the task became
360
0
                    // completed or closed just before registration so we need to check for that.
361
0
                    state = (*header).state.load(Ordering::Acquire);
362
0
363
0
                    // If the task has been closed, restart.
364
0
                    if state & CLOSED != 0 {
365
0
                        continue;
366
0
                    }
367
0
368
0
                    // If the task is still not completed, we're blocked on it.
369
0
                    if state & COMPLETED == 0 {
370
0
                        return Poll::Pending;
371
0
                    }
372
0
                }
373
374
                // Since the task is now completed, mark it as closed in order to grab its output.
375
0
                match (*header).state.compare_exchange(
376
0
                    state,
377
0
                    state | CLOSED,
378
0
                    Ordering::AcqRel,
379
0
                    Ordering::Acquire,
380
0
                ) {
381
                    Ok(_) => {
382
                        // Notify the awaiter. Even though the awaiter is most likely the current
383
                        // task, it could also be another task.
384
0
                        if state & AWAITER != 0 {
385
0
                            (*header).notify(Some(cx.waker()));
386
0
                        }
387
388
                        // Take the output from the task.
389
0
                        let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
390
0
                        let output = output.read();
391
392
                        // Propagate the panic if the task panicked.
393
0
                        let output = match output {
394
0
                            Ok(output) => output,
395
0
                            Err(panic) => {
396
0
                                #[cfg(feature = "std")]
397
0
                                std::panic::resume_unwind(panic);
398
399
                                #[cfg(not(feature = "std"))]
400
                                match panic {}
401
                            }
402
                        };
403
404
0
                        return Poll::Ready(Some(output));
405
                    }
406
0
                    Err(s) => state = s,
407
                }
408
            }
409
        }
410
0
    }
Unexecuted instantiation: <async_task::task::Task<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>>>::poll_task
Unexecuted instantiation: <async_task::task::Task<_, _>>::poll_task
411
412
0
    fn header(&self) -> &Header<M> {
413
0
        let ptr = self.ptr.as_ptr();
414
0
        let header = ptr as *const Header<M>;
415
0
        unsafe { &*header }
416
0
    }
417
418
    /// Returns `true` if the current task is finished.
419
    ///
420
    /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
421
0
    pub fn is_finished(&self) -> bool {
422
0
        let ptr = self.ptr.as_ptr();
423
0
        let header = ptr as *const Header<M>;
424
0
425
0
        unsafe {
426
0
            let state = (*header).state.load(Ordering::Acquire);
427
0
            state & (CLOSED | COMPLETED) != 0
428
0
        }
429
0
    }
430
431
    /// Get the metadata associated with this task.
432
    ///
433
    /// Tasks can be created with a metadata object associated with them; by default, this
434
    /// is a `()` value. See the [`Builder::metadata()`] method for more information.
435
0
    pub fn metadata(&self) -> &M {
436
0
        &self.header().metadata
437
0
    }
438
}
439
440
impl<T, M> Drop for Task<T, M> {
441
0
    fn drop(&mut self) {
442
0
        self.set_canceled();
443
0
        self.set_detached();
444
0
    }
Unexecuted instantiation: <async_task::task::Task<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_task::task::Task<_, _> as core::ops::drop::Drop>::drop
445
}
446
447
impl<T, M> Future for Task<T, M> {
448
    type Output = T;
449
450
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451
0
        match self.poll_task(cx) {
452
0
            Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")),
453
0
            Poll::Pending => Poll::Pending,
454
        }
455
0
    }
Unexecuted instantiation: <async_task::task::Task<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>> as core::future::future::Future>::poll
Unexecuted instantiation: <async_task::task::Task<_, _> as core::future::future::Future>::poll
456
}
457
458
impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> {
459
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460
0
        f.debug_struct("Task")
461
0
            .field("header", self.header())
462
0
            .finish()
463
0
    }
464
}
465
466
/// A spawned task with a fallible response.
467
///
468
/// This type behaves like [`Task`], however it produces an `Option<T>` when
469
/// polled and will return `None` if the executor dropped its
470
/// [`Runnable`][`super::Runnable`] without being run.
471
///
472
/// This can be useful to avoid the panic produced when polling the `Task`
473
/// future if the executor dropped its `Runnable`.
474
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
475
pub struct FallibleTask<T, M = ()> {
476
    task: Task<T, M>,
477
}
478
479
impl<T, M> FallibleTask<T, M> {
480
    /// Detaches the task to let it keep running in the background.
481
    ///
482
    /// # Examples
483
    ///
484
    /// ```
485
    /// use smol::{Executor, Timer};
486
    /// use std::time::Duration;
487
    ///
488
    /// let ex = Executor::new();
489
    ///
490
    /// // Spawn a deamon future.
491
    /// ex.spawn(async {
492
    ///     loop {
493
    ///         println!("I'm a daemon task looping forever.");
494
    ///         Timer::after(Duration::from_secs(1)).await;
495
    ///     }
496
    /// })
497
    /// .fallible()
498
    /// .detach();
499
    /// ```
500
0
    pub fn detach(self) {
501
0
        self.task.detach()
502
0
    }
503
504
    /// Cancels the task and waits for it to stop running.
505
    ///
506
    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
507
    /// it didn't complete.
508
    ///
509
    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
510
    /// canceling because it also waits for the task to stop running.
511
    ///
512
    /// # Examples
513
    ///
514
    /// ```
515
    /// # if cfg!(miri) { return; } // Miri does not support epoll
516
    /// use smol::{future, Executor, Timer};
517
    /// use std::thread;
518
    /// use std::time::Duration;
519
    ///
520
    /// let ex = Executor::new();
521
    ///
522
    /// // Spawn a deamon future.
523
    /// let task = ex.spawn(async {
524
    ///     loop {
525
    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
526
    ///         Timer::after(Duration::from_secs(1)).await;
527
    ///     }
528
    /// })
529
    /// .fallible();
530
    ///
531
    /// // Run an executor thread.
532
    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
533
    ///
534
    /// future::block_on(async {
535
    ///     Timer::after(Duration::from_secs(3)).await;
536
    ///     task.cancel().await;
537
    /// });
538
    /// ```
539
0
    pub async fn cancel(self) -> Option<T> {
540
0
        self.task.cancel().await
541
0
    }
542
543
    /// Returns `true` if the current task is finished.
544
    ///
545
    /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
546
0
    pub fn is_finished(&self) -> bool {
547
0
        self.task.is_finished()
548
0
    }
549
}
550
551
impl<T, M> Future for FallibleTask<T, M> {
552
    type Output = Option<T>;
553
554
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
555
0
        self.task.poll_task(cx)
556
0
    }
557
}
558
559
impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> {
560
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561
0
        f.debug_struct("FallibleTask")
562
0
            .field("header", self.task.header())
563
0
            .finish()
564
0
    }
565
}