/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-task-4.3.0/src/task.rs
Line | Count | Source |
1 | | use core::fmt; |
2 | | use core::future::Future; |
3 | | use core::marker::{PhantomData, Unpin}; |
4 | | use core::mem; |
5 | | use core::pin::Pin; |
6 | | use core::ptr::NonNull; |
7 | | use core::sync::atomic::Ordering; |
8 | | use core::task::{Context, Poll}; |
9 | | |
10 | | use crate::header::Header; |
11 | | use crate::state::*; |
12 | | |
13 | | /// A spawned task. |
14 | | /// |
15 | | /// A [`Task`] can be awaited to retrieve the output of its future. |
16 | | /// |
17 | | /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the |
18 | | /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a |
19 | | /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] |
20 | | /// method. |
21 | | /// |
22 | | /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor |
23 | | /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking |
24 | | /// [`run()`][`super::Runnable::run()`]. |
25 | | /// |
26 | | /// # Examples |
27 | | /// |
28 | | /// ``` |
29 | | /// use smol::{future, Executor}; |
30 | | /// use std::thread; |
31 | | /// |
32 | | /// let ex = Executor::new(); |
33 | | /// |
34 | | /// // Spawn a future onto the executor. |
35 | | /// let task = ex.spawn(async { |
36 | | /// println!("Hello from a task!"); |
37 | | /// 1 + 2 |
38 | | /// }); |
39 | | /// |
40 | | /// // Run an executor thread. |
41 | | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
42 | | /// |
43 | | /// // Wait for the task's output. |
44 | | /// assert_eq!(future::block_on(task), 3); |
45 | | /// ``` |
46 | | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] |
47 | | pub struct Task<T> { |
48 | | /// A raw task pointer. |
49 | | pub(crate) ptr: NonNull<()>, |
50 | | |
51 | | /// A marker capturing generic type `T`. |
52 | | pub(crate) _marker: PhantomData<T>, |
53 | | } |
54 | | |
55 | | unsafe impl<T: Send> Send for Task<T> {} |
56 | | unsafe impl<T> Sync for Task<T> {} |
57 | | |
58 | | impl<T> Unpin for Task<T> {} |
59 | | |
60 | | #[cfg(feature = "std")] |
61 | | impl<T> std::panic::UnwindSafe for Task<T> {} |
62 | | #[cfg(feature = "std")] |
63 | | impl<T> std::panic::RefUnwindSafe for Task<T> {} |
64 | | |
65 | | impl<T> Task<T> { |
66 | | /// Detaches the task to let it keep running in the background. |
67 | | /// |
68 | | /// # Examples |
69 | | /// |
70 | | /// ``` |
71 | | /// use smol::{Executor, Timer}; |
72 | | /// use std::time::Duration; |
73 | | /// |
74 | | /// let ex = Executor::new(); |
75 | | /// |
76 | | /// // Spawn a deamon future. |
77 | | /// ex.spawn(async { |
78 | | /// loop { |
79 | | /// println!("I'm a daemon task looping forever."); |
80 | | /// Timer::after(Duration::from_secs(1)).await; |
81 | | /// } |
82 | | /// }) |
83 | | /// .detach(); |
84 | | /// ``` |
85 | 0 | pub fn detach(self) { |
86 | 0 | let mut this = self; |
87 | 0 | let _out = this.set_detached(); |
88 | 0 | mem::forget(this); |
89 | 0 | } |
90 | | |
91 | | /// Cancels the task and waits for it to stop running. |
92 | | /// |
93 | | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
94 | | /// it didn't complete. |
95 | | /// |
96 | | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
97 | | /// canceling because it also waits for the task to stop running. |
98 | | /// |
99 | | /// # Examples |
100 | | /// |
101 | | /// ``` |
102 | | /// # if cfg!(miri) { return; } // Miri does not support epoll |
103 | | /// use smol::{future, Executor, Timer}; |
104 | | /// use std::thread; |
105 | | /// use std::time::Duration; |
106 | | /// |
107 | | /// let ex = Executor::new(); |
108 | | /// |
109 | | /// // Spawn a deamon future. |
110 | | /// let task = ex.spawn(async { |
111 | | /// loop { |
112 | | /// println!("Even though I'm in an infinite loop, you can still cancel me!"); |
113 | | /// Timer::after(Duration::from_secs(1)).await; |
114 | | /// } |
115 | | /// }); |
116 | | /// |
117 | | /// // Run an executor thread. |
118 | | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
119 | | /// |
120 | | /// future::block_on(async { |
121 | | /// Timer::after(Duration::from_secs(3)).await; |
122 | | /// task.cancel().await; |
123 | | /// }); |
124 | | /// ``` |
125 | 0 | pub async fn cancel(self) -> Option<T> {Unexecuted instantiation: <async_task::task::Task<()>>::cancel Unexecuted instantiation: <async_task::task::Task<_>>::cancel |
126 | 0 | let mut this = self; |
127 | 0 | this.set_canceled(); |
128 | 0 | this.fallible().await |
129 | 0 | } Unexecuted instantiation: <async_task::task::Task<()>>::cancel::{closure#0}Unexecuted instantiation: <async_task::task::Task<_>>::cancel::{closure#0} |
130 | | |
131 | | /// Converts this task into a [`FallibleTask`]. |
132 | | /// |
133 | | /// Like [`Task`], a fallible task will poll the task's output until it is |
134 | | /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being |
135 | | /// dropped without being run. Resolves to the task's output when completed, |
136 | | /// or [`None`] if it didn't complete. |
137 | | /// |
138 | | /// # Examples |
139 | | /// |
140 | | /// ``` |
141 | | /// use smol::{future, Executor}; |
142 | | /// use std::thread; |
143 | | /// |
144 | | /// let ex = Executor::new(); |
145 | | /// |
146 | | /// // Spawn a future onto the executor. |
147 | | /// let task = ex.spawn(async { |
148 | | /// println!("Hello from a task!"); |
149 | | /// 1 + 2 |
150 | | /// }) |
151 | | /// .fallible(); |
152 | | /// |
153 | | /// // Run an executor thread. |
154 | | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
155 | | /// |
156 | | /// // Wait for the task's output. |
157 | | /// assert_eq!(future::block_on(task), Some(3)); |
158 | | /// ``` |
159 | | /// |
160 | | /// ``` |
161 | | /// use smol::future; |
162 | | /// |
163 | | /// // Schedule function which drops the runnable without running it. |
164 | | /// let schedule = move |runnable| drop(runnable); |
165 | | /// |
166 | | /// // Create a task with the future and the schedule function. |
167 | | /// let (runnable, task) = async_task::spawn(async { |
168 | | /// println!("Hello from a task!"); |
169 | | /// 1 + 2 |
170 | | /// }, schedule); |
171 | | /// runnable.schedule(); |
172 | | /// |
173 | | /// // Wait for the task's output. |
174 | | /// assert_eq!(future::block_on(task.fallible()), None); |
175 | | /// ``` |
176 | 0 | pub fn fallible(self) -> FallibleTask<T> { |
177 | 0 | FallibleTask { task: self } |
178 | 0 | } Unexecuted instantiation: <async_task::task::Task<()>>::fallible Unexecuted instantiation: <async_task::task::Task<_>>::fallible |
179 | | |
180 | | /// Puts the task in canceled state. |
181 | 0 | fn set_canceled(&mut self) { |
182 | 0 | let ptr = self.ptr.as_ptr(); |
183 | 0 | let header = ptr as *const Header; |
184 | | |
185 | | unsafe { |
186 | 0 | let mut state = (*header).state.load(Ordering::Acquire); |
187 | | |
188 | | loop { |
189 | | // If the task has been completed or closed, it can't be canceled. |
190 | 0 | if state & (COMPLETED | CLOSED) != 0 { |
191 | 0 | break; |
192 | 0 | } |
193 | | |
194 | | // If the task is not scheduled nor running, we'll need to schedule it. |
195 | 0 | let new = if state & (SCHEDULED | RUNNING) == 0 { |
196 | 0 | (state | SCHEDULED | CLOSED) + REFERENCE |
197 | | } else { |
198 | 0 | state | CLOSED |
199 | | }; |
200 | | |
201 | | // Mark the task as closed. |
202 | 0 | match (*header).state.compare_exchange_weak( |
203 | 0 | state, |
204 | 0 | new, |
205 | 0 | Ordering::AcqRel, |
206 | 0 | Ordering::Acquire, |
207 | 0 | ) { |
208 | | Ok(_) => { |
209 | | // If the task is not scheduled nor running, schedule it one more time so |
210 | | // that its future gets dropped by the executor. |
211 | 0 | if state & (SCHEDULED | RUNNING) == 0 { |
212 | 0 | ((*header).vtable.schedule)(ptr); |
213 | 0 | } |
214 | | |
215 | | // Notify the awaiter that the task has been closed. |
216 | 0 | if state & AWAITER != 0 { |
217 | 0 | (*header).notify(None); |
218 | 0 | } |
219 | | |
220 | 0 | break; |
221 | | } |
222 | 0 | Err(s) => state = s, |
223 | | } |
224 | | } |
225 | | } |
226 | 0 | } Unexecuted instantiation: <async_task::task::Task<()>>::set_canceled Unexecuted instantiation: <async_task::task::Task<_>>::set_canceled |
227 | | |
228 | | /// Puts the task in detached state. |
229 | 0 | fn set_detached(&mut self) -> Option<T> { |
230 | 0 | let ptr = self.ptr.as_ptr(); |
231 | 0 | let header = ptr as *const Header; |
232 | | |
233 | | unsafe { |
234 | | // A place where the output will be stored in case it needs to be dropped. |
235 | 0 | let mut output = None; |
236 | | |
237 | | // Optimistically assume the `Task` is being detached just after creating the task. |
238 | | // This is a common case so if the `Task` is datached, the overhead of it is only one |
239 | | // compare-exchange operation. |
240 | 0 | if let Err(mut state) = (*header).state.compare_exchange_weak( |
241 | 0 | SCHEDULED | TASK | REFERENCE, |
242 | 0 | SCHEDULED | REFERENCE, |
243 | 0 | Ordering::AcqRel, |
244 | 0 | Ordering::Acquire, |
245 | 0 | ) { |
246 | | loop { |
247 | | // If the task has been completed but not yet closed, that means its output |
248 | | // must be dropped. |
249 | 0 | if state & COMPLETED != 0 && state & CLOSED == 0 { |
250 | | // Mark the task as closed in order to grab its output. |
251 | 0 | match (*header).state.compare_exchange_weak( |
252 | 0 | state, |
253 | 0 | state | CLOSED, |
254 | 0 | Ordering::AcqRel, |
255 | 0 | Ordering::Acquire, |
256 | 0 | ) { |
257 | 0 | Ok(_) => { |
258 | 0 | // Read the output. |
259 | 0 | output = |
260 | 0 | Some((((*header).vtable.get_output)(ptr) as *mut T).read()); |
261 | 0 |
|
262 | 0 | // Update the state variable because we're continuing the loop. |
263 | 0 | state |= CLOSED; |
264 | 0 | } |
265 | 0 | Err(s) => state = s, |
266 | | } |
267 | | } else { |
268 | | // If this is the last reference to the task and it's not closed, then |
269 | | // close it and schedule one more time so that its future gets dropped by |
270 | | // the executor. |
271 | 0 | let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { |
272 | 0 | SCHEDULED | CLOSED | REFERENCE |
273 | | } else { |
274 | 0 | state & !TASK |
275 | | }; |
276 | | |
277 | | // Unset the `TASK` flag. |
278 | 0 | match (*header).state.compare_exchange_weak( |
279 | 0 | state, |
280 | 0 | new, |
281 | 0 | Ordering::AcqRel, |
282 | 0 | Ordering::Acquire, |
283 | 0 | ) { |
284 | | Ok(_) => { |
285 | | // If this is the last reference to the task, we need to either |
286 | | // schedule dropping its future or destroy it. |
287 | 0 | if state & !(REFERENCE - 1) == 0 { |
288 | 0 | if state & CLOSED == 0 { |
289 | 0 | ((*header).vtable.schedule)(ptr); |
290 | 0 | } else { |
291 | 0 | ((*header).vtable.destroy)(ptr); |
292 | 0 | } |
293 | 0 | } |
294 | | |
295 | 0 | break; |
296 | | } |
297 | 0 | Err(s) => state = s, |
298 | | } |
299 | | } |
300 | | } |
301 | 0 | } |
302 | | |
303 | 0 | output |
304 | | } |
305 | 0 | } Unexecuted instantiation: <async_task::task::Task<()>>::set_detached Unexecuted instantiation: <async_task::task::Task<_>>::set_detached |
306 | | |
307 | | /// Polls the task to retrieve its output. |
308 | | /// |
309 | | /// Returns `Some` if the task has completed or `None` if it was closed. |
310 | | /// |
311 | | /// A task becomes closed in the following cases: |
312 | | /// |
313 | | /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. |
314 | | /// 2. Its output gets awaited by the `Task`. |
315 | | /// 3. It panics while polling the future. |
316 | | /// 4. It is completed and the `Task` gets dropped. |
317 | 0 | fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
318 | 0 | let ptr = self.ptr.as_ptr(); |
319 | 0 | let header = ptr as *const Header; |
320 | | |
321 | | unsafe { |
322 | 0 | let mut state = (*header).state.load(Ordering::Acquire); |
323 | | |
324 | | loop { |
325 | | // If the task has been closed, notify the awaiter and return `None`. |
326 | 0 | if state & CLOSED != 0 { |
327 | | // If the task is scheduled or running, we need to wait until its future is |
328 | | // dropped. |
329 | 0 | if state & (SCHEDULED | RUNNING) != 0 { |
330 | | // Replace the waker with one associated with the current task. |
331 | 0 | (*header).register(cx.waker()); |
332 | | |
333 | | // Reload the state after registering. It is possible changes occurred just |
334 | | // before registration so we need to check for that. |
335 | 0 | state = (*header).state.load(Ordering::Acquire); |
336 | | |
337 | | // If the task is still scheduled or running, we need to wait because its |
338 | | // future is not dropped yet. |
339 | 0 | if state & (SCHEDULED | RUNNING) != 0 { |
340 | 0 | return Poll::Pending; |
341 | 0 | } |
342 | 0 | } |
343 | | |
344 | | // Even though the awaiter is most likely the current task, it could also be |
345 | | // another task. |
346 | 0 | (*header).notify(Some(cx.waker())); |
347 | 0 | return Poll::Ready(None); |
348 | 0 | } |
349 | | |
350 | | // If the task is not completed, register the current task. |
351 | 0 | if state & COMPLETED == 0 { |
352 | | // Replace the waker with one associated with the current task. |
353 | 0 | (*header).register(cx.waker()); |
354 | | |
355 | | // Reload the state after registering. It is possible that the task became |
356 | | // completed or closed just before registration so we need to check for that. |
357 | 0 | state = (*header).state.load(Ordering::Acquire); |
358 | | |
359 | | // If the task has been closed, restart. |
360 | 0 | if state & CLOSED != 0 { |
361 | 0 | continue; |
362 | 0 | } |
363 | | |
364 | | // If the task is still not completed, we're blocked on it. |
365 | 0 | if state & COMPLETED == 0 { |
366 | 0 | return Poll::Pending; |
367 | 0 | } |
368 | 0 | } |
369 | | |
370 | | // Since the task is now completed, mark it as closed in order to grab its output. |
371 | 0 | match (*header).state.compare_exchange( |
372 | 0 | state, |
373 | 0 | state | CLOSED, |
374 | 0 | Ordering::AcqRel, |
375 | 0 | Ordering::Acquire, |
376 | 0 | ) { |
377 | | Ok(_) => { |
378 | | // Notify the awaiter. Even though the awaiter is most likely the current |
379 | | // task, it could also be another task. |
380 | 0 | if state & AWAITER != 0 { |
381 | 0 | (*header).notify(Some(cx.waker())); |
382 | 0 | } |
383 | | |
384 | | // Take the output from the task. |
385 | 0 | let output = ((*header).vtable.get_output)(ptr) as *mut T; |
386 | 0 | return Poll::Ready(Some(output.read())); |
387 | | } |
388 | 0 | Err(s) => state = s, |
389 | | } |
390 | | } |
391 | | } |
392 | 0 | } Unexecuted instantiation: <async_task::task::Task<()>>::poll_task Unexecuted instantiation: <async_task::task::Task<_>>::poll_task |
393 | | |
394 | 0 | fn header(&self) -> &Header { |
395 | 0 | let ptr = self.ptr.as_ptr(); |
396 | 0 | let header = ptr as *const Header; |
397 | 0 | unsafe { &*header } |
398 | 0 | } |
399 | | |
400 | | /// Returns `true` if the current task is finished. |
401 | | /// |
402 | | /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. |
403 | 0 | pub fn is_finished(&self) -> bool { |
404 | 0 | let ptr = self.ptr.as_ptr(); |
405 | 0 | let header = ptr as *const Header; |
406 | | |
407 | | unsafe { |
408 | 0 | let state = (*header).state.load(Ordering::Acquire); |
409 | 0 | state & (CLOSED | COMPLETED) != 0 |
410 | | } |
411 | 0 | } |
412 | | } |
413 | | |
414 | | impl<T> Drop for Task<T> { |
415 | 0 | fn drop(&mut self) { |
416 | 0 | self.set_canceled(); |
417 | 0 | self.set_detached(); |
418 | 0 | } Unexecuted instantiation: <async_task::task::Task<()> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_task::task::Task<_> as core::ops::drop::Drop>::drop |
419 | | } |
420 | | |
421 | | impl<T> Future for Task<T> { |
422 | | type Output = T; |
423 | | |
424 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
425 | 0 | match self.poll_task(cx) { |
426 | 0 | Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), |
427 | 0 | Poll::Pending => Poll::Pending, |
428 | | } |
429 | 0 | } |
430 | | } |
431 | | |
432 | | impl<T> fmt::Debug for Task<T> { |
433 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
434 | 0 | f.debug_struct("Task") |
435 | 0 | .field("header", self.header()) |
436 | 0 | .finish() |
437 | 0 | } |
438 | | } |
439 | | |
440 | | /// A spawned task with a fallible response. |
441 | | /// |
442 | | /// This type behaves like [`Task`], however it produces an `Option<T>` when |
443 | | /// polled and will return `None` if the executor dropped its |
444 | | /// [`Runnable`][`super::Runnable`] without being run. |
445 | | /// |
446 | | /// This can be useful to avoid the panic produced when polling the `Task` |
447 | | /// future if the executor dropped its `Runnable`. |
448 | | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] |
449 | | pub struct FallibleTask<T> { |
450 | | task: Task<T>, |
451 | | } |
452 | | |
453 | | impl<T> FallibleTask<T> { |
454 | | /// Detaches the task to let it keep running in the background. |
455 | | /// |
456 | | /// # Examples |
457 | | /// |
458 | | /// ``` |
459 | | /// use smol::{Executor, Timer}; |
460 | | /// use std::time::Duration; |
461 | | /// |
462 | | /// let ex = Executor::new(); |
463 | | /// |
464 | | /// // Spawn a deamon future. |
465 | | /// ex.spawn(async { |
466 | | /// loop { |
467 | | /// println!("I'm a daemon task looping forever."); |
468 | | /// Timer::after(Duration::from_secs(1)).await; |
469 | | /// } |
470 | | /// }) |
471 | | /// .fallible() |
472 | | /// .detach(); |
473 | | /// ``` |
474 | 0 | pub fn detach(self) { |
475 | 0 | self.task.detach() |
476 | 0 | } |
477 | | |
478 | | /// Cancels the task and waits for it to stop running. |
479 | | /// |
480 | | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
481 | | /// it didn't complete. |
482 | | /// |
483 | | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
484 | | /// canceling because it also waits for the task to stop running. |
485 | | /// |
486 | | /// # Examples |
487 | | /// |
488 | | /// ``` |
489 | | /// # if cfg!(miri) { return; } // Miri does not support epoll |
490 | | /// use smol::{future, Executor, Timer}; |
491 | | /// use std::thread; |
492 | | /// use std::time::Duration; |
493 | | /// |
494 | | /// let ex = Executor::new(); |
495 | | /// |
496 | | /// // Spawn a deamon future. |
497 | | /// let task = ex.spawn(async { |
498 | | /// loop { |
499 | | /// println!("Even though I'm in an infinite loop, you can still cancel me!"); |
500 | | /// Timer::after(Duration::from_secs(1)).await; |
501 | | /// } |
502 | | /// }) |
503 | | /// .fallible(); |
504 | | /// |
505 | | /// // Run an executor thread. |
506 | | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
507 | | /// |
508 | | /// future::block_on(async { |
509 | | /// Timer::after(Duration::from_secs(3)).await; |
510 | | /// task.cancel().await; |
511 | | /// }); |
512 | | /// ``` |
513 | 0 | pub async fn cancel(self) -> Option<T> { |
514 | 0 | self.task.cancel().await |
515 | 0 | } |
516 | | } |
517 | | |
518 | | impl<T> Future for FallibleTask<T> { |
519 | | type Output = Option<T>; |
520 | | |
521 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
522 | 0 | self.task.poll_task(cx) |
523 | 0 | } Unexecuted instantiation: <async_task::task::FallibleTask<()> as core::future::future::Future>::poll Unexecuted instantiation: <async_task::task::FallibleTask<_> as core::future::future::Future>::poll |
524 | | } |
525 | | |
526 | | impl<T> fmt::Debug for FallibleTask<T> { |
527 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
528 | 0 | f.debug_struct("FallibleTask") |
529 | 0 | .field("header", self.task.header()) |
530 | 0 | .finish() |
531 | 0 | } |
532 | | } |