/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-task-4.3.0/src/runnable.rs
Line | Count | Source |
1 | | use core::fmt; |
2 | | use core::future::Future; |
3 | | use core::marker::PhantomData; |
4 | | use core::mem; |
5 | | use core::ptr::NonNull; |
6 | | use core::sync::atomic::Ordering; |
7 | | use core::task::Waker; |
8 | | |
9 | | use crate::header::Header; |
10 | | use crate::raw::RawTask; |
11 | | use crate::state::*; |
12 | | use crate::Task; |
13 | | |
14 | | /// Creates a new task. |
15 | | /// |
16 | | /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its |
17 | | /// output. |
18 | | /// |
19 | | /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] |
20 | | /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run |
21 | | /// again. |
22 | | /// |
23 | | /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. |
24 | | /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it |
25 | | /// should push it into a task queue so that it can be processed later. |
26 | | /// |
27 | | /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider |
28 | | /// using [`spawn_local()`] or [`spawn_unchecked()`] instead. |
29 | | /// |
30 | | /// # Examples |
31 | | /// |
32 | | /// ``` |
33 | | /// // The future inside the task. |
34 | | /// let future = async { |
35 | | /// println!("Hello, world!"); |
36 | | /// }; |
37 | | /// |
38 | | /// // A function that schedules the task when it gets woken up. |
39 | | /// let (s, r) = flume::unbounded(); |
40 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
41 | | /// |
42 | | /// // Create a task with the future and the schedule function. |
43 | | /// let (runnable, task) = async_task::spawn(future, schedule); |
44 | | /// ``` |
45 | 0 | pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
46 | 0 | where |
47 | 0 | F: Future + Send + 'static, |
48 | 0 | F::Output: Send + 'static, |
49 | 0 | S: Fn(Runnable) + Send + Sync + 'static, |
50 | | { |
51 | 0 | unsafe { spawn_unchecked(future, schedule) } |
52 | 0 | } |
53 | | |
54 | | /// Creates a new thread-local task. |
55 | | /// |
56 | | /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the |
57 | | /// [`Runnable`] is used or dropped on another thread, a panic will occur. |
58 | | /// |
59 | | /// This function is only available when the `std` feature for this crate is enabled. |
60 | | /// |
61 | | /// # Examples |
62 | | /// |
63 | | /// ``` |
64 | | /// use async_task::Runnable; |
65 | | /// use flume::{Receiver, Sender}; |
66 | | /// use std::rc::Rc; |
67 | | /// |
68 | | /// thread_local! { |
69 | | /// // A queue that holds scheduled tasks. |
70 | | /// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded(); |
71 | | /// } |
72 | | /// |
73 | | /// // Make a non-Send future. |
74 | | /// let msg: Rc<str> = "Hello, world!".into(); |
75 | | /// let future = async move { |
76 | | /// println!("{}", msg); |
77 | | /// }; |
78 | | /// |
79 | | /// // A function that schedules the task when it gets woken up. |
80 | | /// let s = QUEUE.with(|(s, _)| s.clone()); |
81 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
82 | | /// |
83 | | /// // Create a task with the future and the schedule function. |
84 | | /// let (runnable, task) = async_task::spawn_local(future, schedule); |
85 | | /// ``` |
86 | | #[cfg(feature = "std")] |
87 | 0 | pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
88 | 0 | where |
89 | 0 | F: Future + 'static, |
90 | 0 | F::Output: 'static, |
91 | 0 | S: Fn(Runnable) + Send + Sync + 'static, |
92 | | { |
93 | | use std::mem::ManuallyDrop; |
94 | | use std::pin::Pin; |
95 | | use std::task::{Context, Poll}; |
96 | | use std::thread::{self, ThreadId}; |
97 | | |
98 | | #[inline] |
99 | 0 | fn thread_id() -> ThreadId { |
100 | | thread_local! { |
101 | | static ID: ThreadId = thread::current().id(); |
102 | | } |
103 | 0 | ID.try_with(|id| *id) |
104 | 0 | .unwrap_or_else(|_| thread::current().id()) Unexecuted instantiation: async_task::runnable::spawn_local::thread_id::{closure#1}Unexecuted instantiation: async_task::runnable::spawn_local::thread_id::{closure#1} |
105 | 0 | } Unexecuted instantiation: async_task::runnable::spawn_local::thread_id Unexecuted instantiation: async_task::runnable::spawn_local::thread_id |
106 | | |
107 | | struct Checked<F> { |
108 | | id: ThreadId, |
109 | | inner: ManuallyDrop<F>, |
110 | | } |
111 | | |
112 | | impl<F> Drop for Checked<F> { |
113 | 0 | fn drop(&mut self) { |
114 | 0 | assert!( |
115 | 0 | self.id == thread_id(), |
116 | | "local task dropped by a thread that didn't spawn it" |
117 | | ); |
118 | 0 | unsafe { |
119 | 0 | ManuallyDrop::drop(&mut self.inner); |
120 | 0 | } |
121 | 0 | } Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::ops::drop::Drop>::dropUnexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::ops::drop::Drop>::dropUnexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::ops::drop::Drop>::drop |
122 | | } |
123 | | |
124 | | impl<F: Future> Future for Checked<F> { |
125 | | type Output = F::Output; |
126 | | |
127 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
128 | 0 | assert!( |
129 | 0 | self.id == thread_id(), |
130 | | "local task polled by a thread that didn't spawn it" |
131 | | ); |
132 | 0 | unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::future::future::Future>::poll::{closure#0}Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::future::future::Future>::poll::{closure#0}Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::future::future::Future>::poll::{closure#0} |
133 | 0 | } Unexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}> as core::future::future::Future>::pollUnexecuted instantiation: <async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}> as core::future::future::Future>::pollUnexecuted instantiation: <async_task::runnable::spawn_local::Checked<_> as core::future::future::Future>::poll |
134 | | } |
135 | | |
136 | | // Wrap the future into one that checks which thread it's on. |
137 | 0 | let future = Checked { |
138 | 0 | id: thread_id(), |
139 | 0 | inner: ManuallyDrop::new(future), |
140 | 0 | }; |
141 | | |
142 | 0 | unsafe { spawn_unchecked(future, schedule) } |
143 | 0 | } Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_local::<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_local::<_, _> |
144 | | |
145 | | /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. |
146 | | /// |
147 | | /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and |
148 | | /// `'static` on `future` and `schedule`. |
149 | | /// |
150 | | /// # Safety |
151 | | /// |
152 | | /// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original |
153 | | /// thread. |
154 | | /// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. |
155 | | /// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on |
156 | | /// the original thread. |
157 | | /// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. |
158 | | /// |
159 | | /// # Examples |
160 | | /// |
161 | | /// ``` |
162 | | /// // The future inside the task. |
163 | | /// let future = async { |
164 | | /// println!("Hello, world!"); |
165 | | /// }; |
166 | | /// |
167 | | /// // If the task gets woken up, it will be sent into this channel. |
168 | | /// let (s, r) = flume::unbounded(); |
169 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
170 | | /// |
171 | | /// // Create a task with the future and the schedule function. |
172 | | /// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; |
173 | | /// ``` |
174 | 0 | pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
175 | 0 | where |
176 | 0 | F: Future, |
177 | 0 | S: Fn(Runnable), |
178 | | { |
179 | | // Allocate large futures on the heap. |
180 | 0 | let ptr = if mem::size_of::<F>() >= 2048 { |
181 | 0 | let future = alloc::boxed::Box::pin(future); |
182 | 0 | RawTask::<_, F::Output, S>::allocate(future, schedule) |
183 | | } else { |
184 | 0 | RawTask::<F, F::Output, S>::allocate(future, schedule) |
185 | | }; |
186 | | |
187 | 0 | let runnable = Runnable { ptr }; |
188 | 0 | let task = Task { |
189 | 0 | ptr, |
190 | 0 | _marker: PhantomData, |
191 | 0 | }; |
192 | 0 | (runnable, task) |
193 | 0 | } Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_in_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::fd_executor::EpollReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_unchecked::<async_task::runnable::spawn_local::Checked<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>, <alloc::sync::Arc<cros_async::common_executor::RawExecutor<cros_async::sys::linux::uring_executor::UringReactor>> as cros_async::executor::ExecutorTrait>::spawn_local<devices::virtio::vhost_user_backend::wl::run_out_queue::{closure#0}>::{closure#0}>Unexecuted instantiation: async_task::runnable::spawn_unchecked::<_, _> |
194 | | |
195 | | /// A handle to a runnable task. |
196 | | /// |
197 | | /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is |
198 | | /// scheduled for running. |
199 | | /// |
200 | | /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] |
201 | | /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run |
202 | | /// again. |
203 | | /// |
204 | | /// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and |
205 | | /// awaiting the [`Task`] after that will result in a panic. |
206 | | /// |
207 | | /// # Examples |
208 | | /// |
209 | | /// ``` |
210 | | /// use async_task::Runnable; |
211 | | /// use once_cell::sync::Lazy; |
212 | | /// use std::{panic, thread}; |
213 | | /// |
214 | | /// // A simple executor. |
215 | | /// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { |
216 | | /// let (sender, receiver) = flume::unbounded::<Runnable>(); |
217 | | /// thread::spawn(|| { |
218 | | /// for runnable in receiver { |
219 | | /// let _ignore_panic = panic::catch_unwind(|| runnable.run()); |
220 | | /// } |
221 | | /// }); |
222 | | /// sender |
223 | | /// }); |
224 | | /// |
225 | | /// // Create a task with a simple future. |
226 | | /// let schedule = |runnable| QUEUE.send(runnable).unwrap(); |
227 | | /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); |
228 | | /// |
229 | | /// // Schedule the task and await its output. |
230 | | /// runnable.schedule(); |
231 | | /// assert_eq!(smol::future::block_on(task), 3); |
232 | | /// ``` |
233 | | pub struct Runnable { |
234 | | /// A pointer to the heap-allocated task. |
235 | | pub(crate) ptr: NonNull<()>, |
236 | | } |
237 | | |
238 | | unsafe impl Send for Runnable {} |
239 | | unsafe impl Sync for Runnable {} |
240 | | |
241 | | #[cfg(feature = "std")] |
242 | | impl std::panic::UnwindSafe for Runnable {} |
243 | | #[cfg(feature = "std")] |
244 | | impl std::panic::RefUnwindSafe for Runnable {} |
245 | | |
246 | | impl Runnable { |
247 | | /// Schedules the task. |
248 | | /// |
249 | | /// This is a convenience method that passes the [`Runnable`] to the schedule function. |
250 | | /// |
251 | | /// # Examples |
252 | | /// |
253 | | /// ``` |
254 | | /// // A function that schedules the task when it gets woken up. |
255 | | /// let (s, r) = flume::unbounded(); |
256 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
257 | | /// |
258 | | /// // Create a task with a simple future and the schedule function. |
259 | | /// let (runnable, task) = async_task::spawn(async {}, schedule); |
260 | | /// |
261 | | /// // Schedule the task. |
262 | | /// assert_eq!(r.len(), 0); |
263 | | /// runnable.schedule(); |
264 | | /// assert_eq!(r.len(), 1); |
265 | | /// ``` |
266 | 0 | pub fn schedule(self) { |
267 | 0 | let ptr = self.ptr.as_ptr(); |
268 | 0 | let header = ptr as *const Header; |
269 | 0 | mem::forget(self); |
270 | | |
271 | 0 | unsafe { |
272 | 0 | ((*header).vtable.schedule)(ptr); |
273 | 0 | } |
274 | 0 | } |
275 | | |
276 | | /// Runs the task by polling its future. |
277 | | /// |
278 | | /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets |
279 | | /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the |
280 | | /// [`Runnable`] vanishes until the task is woken. |
281 | | /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e. |
282 | | /// it woke itself and then gave the control back to the executor. |
283 | | /// |
284 | | /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then |
285 | | /// this method simply destroys the task. |
286 | | /// |
287 | | /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`] |
288 | | /// after that will also result in a panic. |
289 | | /// |
290 | | /// # Examples |
291 | | /// |
292 | | /// ``` |
293 | | /// // A function that schedules the task when it gets woken up. |
294 | | /// let (s, r) = flume::unbounded(); |
295 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
296 | | /// |
297 | | /// // Create a task with a simple future and the schedule function. |
298 | | /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); |
299 | | /// |
300 | | /// // Run the task and check its output. |
301 | | /// runnable.run(); |
302 | | /// assert_eq!(smol::future::block_on(task), 3); |
303 | | /// ``` |
304 | 0 | pub fn run(self) -> bool { |
305 | 0 | let ptr = self.ptr.as_ptr(); |
306 | 0 | let header = ptr as *const Header; |
307 | 0 | mem::forget(self); |
308 | | |
309 | 0 | unsafe { ((*header).vtable.run)(ptr) } |
310 | 0 | } |
311 | | |
312 | | /// Returns a waker associated with this task. |
313 | | /// |
314 | | /// # Examples |
315 | | /// |
316 | | /// ``` |
317 | | /// use smol::future; |
318 | | /// |
319 | | /// // A function that schedules the task when it gets woken up. |
320 | | /// let (s, r) = flume::unbounded(); |
321 | | /// let schedule = move |runnable| s.send(runnable).unwrap(); |
322 | | /// |
323 | | /// // Create a task with a simple future and the schedule function. |
324 | | /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule); |
325 | | /// |
326 | | /// // Take a waker and run the task. |
327 | | /// let waker = runnable.waker(); |
328 | | /// runnable.run(); |
329 | | /// |
330 | | /// // Reschedule the task by waking it. |
331 | | /// assert_eq!(r.len(), 0); |
332 | | /// waker.wake(); |
333 | | /// assert_eq!(r.len(), 1); |
334 | | /// ``` |
335 | 0 | pub fn waker(&self) -> Waker { |
336 | 0 | let ptr = self.ptr.as_ptr(); |
337 | 0 | let header = ptr as *const Header; |
338 | | |
339 | | unsafe { |
340 | 0 | let raw_waker = ((*header).vtable.clone_waker)(ptr); |
341 | 0 | Waker::from_raw(raw_waker) |
342 | | } |
343 | 0 | } |
344 | | } |
345 | | |
346 | | impl Drop for Runnable { |
347 | 0 | fn drop(&mut self) { |
348 | 0 | let ptr = self.ptr.as_ptr(); |
349 | 0 | let header = ptr as *const Header; |
350 | | |
351 | | unsafe { |
352 | 0 | let mut state = (*header).state.load(Ordering::Acquire); |
353 | | |
354 | | loop { |
355 | | // If the task has been completed or closed, it can't be canceled. |
356 | 0 | if state & (COMPLETED | CLOSED) != 0 { |
357 | 0 | break; |
358 | 0 | } |
359 | | |
360 | | // Mark the task as closed. |
361 | 0 | match (*header).state.compare_exchange_weak( |
362 | 0 | state, |
363 | 0 | state | CLOSED, |
364 | 0 | Ordering::AcqRel, |
365 | 0 | Ordering::Acquire, |
366 | 0 | ) { |
367 | 0 | Ok(_) => break, |
368 | 0 | Err(s) => state = s, |
369 | | } |
370 | | } |
371 | | |
372 | | // Drop the future. |
373 | 0 | ((*header).vtable.drop_future)(ptr); |
374 | | |
375 | | // Mark the task as unscheduled. |
376 | 0 | let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); |
377 | | |
378 | | // Notify the awaiter that the future has been dropped. |
379 | 0 | if state & AWAITER != 0 { |
380 | 0 | (*header).notify(None); |
381 | 0 | } |
382 | | |
383 | | // Drop the task reference. |
384 | 0 | ((*header).vtable.drop_ref)(ptr); |
385 | | } |
386 | 0 | } |
387 | | } |
388 | | |
389 | | impl fmt::Debug for Runnable { |
390 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
391 | 0 | let ptr = self.ptr.as_ptr(); |
392 | 0 | let header = ptr as *const Header; |
393 | | |
394 | 0 | f.debug_struct("Runnable") |
395 | 0 | .field("header", unsafe { &(*header) }) |
396 | 0 | .finish() |
397 | 0 | } |
398 | | } |