/rust/registry/src/index.crates.io-6f17d22bba15001f/async-executor-1.13.1/src/lib.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Async executors. |
2 | | //! |
3 | | //! This crate provides two reference executors that trade performance for |
4 | | //! functionality. They should be considered reference executors that are "good |
5 | | //! enough" for most use cases. For more specialized use cases, consider writing |
6 | | //! your own executor on top of [`async-task`]. |
7 | | //! |
8 | | //! [`async-task`]: https://crates.io/crates/async-task |
9 | | //! |
10 | | //! # Examples |
11 | | //! |
12 | | //! ``` |
13 | | //! use async_executor::Executor; |
14 | | //! use futures_lite::future; |
15 | | //! |
16 | | //! // Create a new executor. |
17 | | //! let ex = Executor::new(); |
18 | | //! |
19 | | //! // Spawn a task. |
20 | | //! let task = ex.spawn(async { |
21 | | //! println!("Hello world"); |
22 | | //! }); |
23 | | //! |
24 | | //! // Run the executor until the task completes. |
25 | | //! future::block_on(ex.run(task)); |
26 | | //! ``` |
27 | | |
28 | | #![warn( |
29 | | missing_docs, |
30 | | missing_debug_implementations, |
31 | | rust_2018_idioms, |
32 | | clippy::undocumented_unsafe_blocks |
33 | | )] |
34 | | #![doc( |
35 | | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
36 | | )] |
37 | | #![doc( |
38 | | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
39 | | )] |
40 | | #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] |
41 | | |
42 | | use std::fmt; |
43 | | use std::marker::PhantomData; |
44 | | use std::panic::{RefUnwindSafe, UnwindSafe}; |
45 | | use std::rc::Rc; |
46 | | use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; |
47 | | use std::sync::{Arc, Mutex, RwLock, TryLockError}; |
48 | | use std::task::{Poll, Waker}; |
49 | | |
50 | | use async_task::{Builder, Runnable}; |
51 | | use concurrent_queue::ConcurrentQueue; |
52 | | use futures_lite::{future, prelude::*}; |
53 | | use slab::Slab; |
54 | | |
55 | | #[cfg(feature = "static")] |
56 | | mod static_executors; |
57 | | |
58 | | #[doc(no_inline)] |
59 | | pub use async_task::{FallibleTask, Task}; |
60 | | #[cfg(feature = "static")] |
61 | | #[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))] |
62 | | pub use static_executors::*; |
63 | | |
64 | | /// An async executor. |
65 | | /// |
66 | | /// # Examples |
67 | | /// |
68 | | /// A multi-threaded executor: |
69 | | /// |
70 | | /// ``` |
71 | | /// use async_channel::unbounded; |
72 | | /// use async_executor::Executor; |
73 | | /// use easy_parallel::Parallel; |
74 | | /// use futures_lite::future; |
75 | | /// |
76 | | /// let ex = Executor::new(); |
77 | | /// let (signal, shutdown) = unbounded::<()>(); |
78 | | /// |
79 | | /// Parallel::new() |
80 | | /// // Run four executor threads. |
81 | | /// .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
82 | | /// // Run the main future on the current thread. |
83 | | /// .finish(|| future::block_on(async { |
84 | | /// println!("Hello world!"); |
85 | | /// drop(signal); |
86 | | /// })); |
87 | | /// ``` |
88 | | pub struct Executor<'a> { |
89 | | /// The executor state. |
90 | | state: AtomicPtr<State>, |
91 | | |
92 | | /// Makes the `'a` lifetime invariant. |
93 | | _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>, |
94 | | } |
95 | | |
96 | | // SAFETY: Executor stores no thread local state that can be accessed via other thread. |
97 | | unsafe impl Send for Executor<'_> {} |
98 | | // SAFETY: Executor internally synchronizes all of it's operations internally. |
99 | | unsafe impl Sync for Executor<'_> {} |
100 | | |
101 | | impl UnwindSafe for Executor<'_> {} |
102 | | impl RefUnwindSafe for Executor<'_> {} |
103 | | |
104 | | impl fmt::Debug for Executor<'_> { |
105 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
106 | 0 | debug_executor(self, "Executor", f) |
107 | 0 | } |
108 | | } |
109 | | |
110 | | impl<'a> Executor<'a> { |
111 | | /// Creates a new executor. |
112 | | /// |
113 | | /// # Examples |
114 | | /// |
115 | | /// ``` |
116 | | /// use async_executor::Executor; |
117 | | /// |
118 | | /// let ex = Executor::new(); |
119 | | /// ``` |
120 | 0 | pub const fn new() -> Executor<'a> { |
121 | 0 | Executor { |
122 | 0 | state: AtomicPtr::new(std::ptr::null_mut()), |
123 | 0 | _marker: PhantomData, |
124 | 0 | } |
125 | 0 | } |
126 | | |
127 | | /// Returns `true` if there are no unfinished tasks. |
128 | | /// |
129 | | /// # Examples |
130 | | /// |
131 | | /// ``` |
132 | | /// use async_executor::Executor; |
133 | | /// |
134 | | /// let ex = Executor::new(); |
135 | | /// assert!(ex.is_empty()); |
136 | | /// |
137 | | /// let task = ex.spawn(async { |
138 | | /// println!("Hello world"); |
139 | | /// }); |
140 | | /// assert!(!ex.is_empty()); |
141 | | /// |
142 | | /// assert!(ex.try_tick()); |
143 | | /// assert!(ex.is_empty()); |
144 | | /// ``` |
145 | 0 | pub fn is_empty(&self) -> bool { |
146 | 0 | self.state().active.lock().unwrap().is_empty() |
147 | 0 | } |
148 | | |
149 | | /// Spawns a task onto the executor. |
150 | | /// |
151 | | /// # Examples |
152 | | /// |
153 | | /// ``` |
154 | | /// use async_executor::Executor; |
155 | | /// |
156 | | /// let ex = Executor::new(); |
157 | | /// |
158 | | /// let task = ex.spawn(async { |
159 | | /// println!("Hello world"); |
160 | | /// }); |
161 | | /// ``` |
162 | 0 | pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { |
163 | 0 | let mut active = self.state().active.lock().unwrap(); |
164 | 0 |
|
165 | 0 | // SAFETY: `T` and the future are `Send`. |
166 | 0 | unsafe { self.spawn_inner(future, &mut active) } |
167 | 0 | } Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn::<_, _> |
168 | | |
169 | | /// Spawns many tasks onto the executor. |
170 | | /// |
171 | | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
172 | | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
173 | | /// contention. |
174 | | /// |
175 | | /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to |
176 | | /// prevent runner thread starvation. It is assumed that the iterator provided does not |
177 | | /// block; blocking iterators can lock up the internal mutex and therefore the entire |
178 | | /// executor. |
179 | | /// |
180 | | /// ## Example |
181 | | /// |
182 | | /// ``` |
183 | | /// use async_executor::Executor; |
184 | | /// use futures_lite::{stream, prelude::*}; |
185 | | /// use std::future::ready; |
186 | | /// |
187 | | /// # futures_lite::future::block_on(async { |
188 | | /// let mut ex = Executor::new(); |
189 | | /// |
190 | | /// let futures = [ |
191 | | /// ready(1), |
192 | | /// ready(2), |
193 | | /// ready(3) |
194 | | /// ]; |
195 | | /// |
196 | | /// // Spawn all of the futures onto the executor at once. |
197 | | /// let mut tasks = vec![]; |
198 | | /// ex.spawn_many(futures, &mut tasks); |
199 | | /// |
200 | | /// // Await all of them. |
201 | | /// let results = ex.run(async move { |
202 | | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
203 | | /// }).await; |
204 | | /// assert_eq!(results, [1, 2, 3]); |
205 | | /// # }); |
206 | | /// ``` |
207 | | /// |
208 | | /// [`spawn`]: Executor::spawn |
209 | 0 | pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>( |
210 | 0 | &self, |
211 | 0 | futures: impl IntoIterator<Item = F>, |
212 | 0 | handles: &mut impl Extend<Task<F::Output>>, |
213 | 0 | ) { |
214 | 0 | let mut active = Some(self.state().active.lock().unwrap()); |
215 | 0 |
|
216 | 0 | // Convert the futures into tasks. |
217 | 0 | let tasks = futures.into_iter().enumerate().map(move |(i, future)| { |
218 | 0 | // SAFETY: `T` and the future are `Send`. |
219 | 0 | let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; |
220 | 0 |
|
221 | 0 | // Yield the lock every once in a while to ease contention. |
222 | 0 | if i.wrapping_sub(1) % 500 == 0 { |
223 | 0 | drop(active.take()); |
224 | 0 | active = Some(self.state().active.lock().unwrap()); |
225 | 0 | } |
226 | | |
227 | 0 | task |
228 | 0 | }); |
229 | 0 |
|
230 | 0 | // Push the tasks to the user's collection. |
231 | 0 | handles.extend(tasks); |
232 | 0 | } |
233 | | |
234 | | /// Spawn a future while holding the inner lock. |
235 | | /// |
236 | | /// # Safety |
237 | | /// |
238 | | /// If this is an `Executor`, `F` and `T` must be `Send`. |
239 | 0 | unsafe fn spawn_inner<T: 'a>( |
240 | 0 | &self, |
241 | 0 | future: impl Future<Output = T> + 'a, |
242 | 0 | active: &mut Slab<Waker>, |
243 | 0 | ) -> Task<T> { |
244 | 0 | // Remove the task from the set of active tasks when the future finishes. |
245 | 0 | let entry = active.vacant_entry(); |
246 | 0 | let index = entry.key(); |
247 | 0 | let state = self.state_as_arc(); |
248 | 0 | let future = async move { |
249 | 0 | let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#0}::{closure#0} |
250 | 0 | future.await |
251 | 0 | }; Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#0} |
252 | | |
253 | | // Create the task and register it in the set of active tasks. |
254 | | // |
255 | | // SAFETY: |
256 | | // |
257 | | // If `future` is not `Send`, this must be a `LocalExecutor` as per this |
258 | | // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, |
259 | | // `try_tick`, `tick` and `run` can only be called from the origin |
260 | | // thread of the `LocalExecutor`. Similarly, `spawn` can only be called |
261 | | // from the origin thread, ensuring that `future` and the executor share |
262 | | // the same origin thread. The `Runnable` can be scheduled from other |
263 | | // threads, but because of the above `Runnable` can only be called or |
264 | | // dropped on the origin thread. |
265 | | // |
266 | | // `future` is not `'static`, but we make sure that the `Runnable` does |
267 | | // not outlive `'a`. When the executor is dropped, the `active` field is |
268 | | // drained and all of the `Waker`s are woken. Then, the queue inside of |
269 | | // the `Executor` is drained of all of its runnables. This ensures that |
270 | | // runnables are dropped and this precondition is satisfied. |
271 | | // |
272 | | // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. |
273 | | // Therefore we do not need to worry about what is done with the |
274 | | // `Waker`. |
275 | 0 | let (runnable, task) = Builder::new() |
276 | 0 | .propagate_panic(true) |
277 | 0 | .spawn_unchecked(|()| future, self.schedule()); Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#1} Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#1} |
278 | 0 | entry.insert(runnable.waker()); |
279 | 0 |
|
280 | 0 | runnable.schedule(); |
281 | 0 | task |
282 | 0 | } Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _> |
283 | | |
284 | | /// Attempts to run a task if at least one is scheduled. |
285 | | /// |
286 | | /// Running a scheduled task means simply polling its future once. |
287 | | /// |
288 | | /// # Examples |
289 | | /// |
290 | | /// ``` |
291 | | /// use async_executor::Executor; |
292 | | /// |
293 | | /// let ex = Executor::new(); |
294 | | /// assert!(!ex.try_tick()); // no tasks to run |
295 | | /// |
296 | | /// let task = ex.spawn(async { |
297 | | /// println!("Hello world"); |
298 | | /// }); |
299 | | /// assert!(ex.try_tick()); // a task was found |
300 | | /// ``` |
301 | 0 | pub fn try_tick(&self) -> bool { |
302 | 0 | self.state().try_tick() |
303 | 0 | } |
304 | | |
305 | | /// Runs a single task. |
306 | | /// |
307 | | /// Running a task means simply polling its future once. |
308 | | /// |
309 | | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
310 | | /// |
311 | | /// # Examples |
312 | | /// |
313 | | /// ``` |
314 | | /// use async_executor::Executor; |
315 | | /// use futures_lite::future; |
316 | | /// |
317 | | /// let ex = Executor::new(); |
318 | | /// |
319 | | /// let task = ex.spawn(async { |
320 | | /// println!("Hello world"); |
321 | | /// }); |
322 | | /// future::block_on(ex.tick()); // runs the task |
323 | | /// ``` |
324 | 0 | pub async fn tick(&self) { |
325 | 0 | self.state().tick().await; |
326 | 0 | } |
327 | | |
328 | | /// Runs the executor until the given future completes. |
329 | | /// |
330 | | /// # Examples |
331 | | /// |
332 | | /// ``` |
333 | | /// use async_executor::Executor; |
334 | | /// use futures_lite::future; |
335 | | /// |
336 | | /// let ex = Executor::new(); |
337 | | /// |
338 | | /// let task = ex.spawn(async { 1 + 2 }); |
339 | | /// let res = future::block_on(ex.run(async { task.await * 2 })); |
340 | | /// |
341 | | /// assert_eq!(res, 6); |
342 | | /// ``` |
343 | 0 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { Unexecuted instantiation: <async_executor::Executor>::run::<(), futures_util::future::pending::Pending<()>> Unexecuted instantiation: <async_executor::Executor>::run::<_, _> |
344 | 0 | self.state().run(future).await |
345 | 0 | } Unexecuted instantiation: <async_executor::Executor>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0} Unexecuted instantiation: <async_executor::Executor>::run::<_, _>::{closure#0} |
346 | | |
347 | | /// Returns a function that schedules a runnable task when it gets woken up. |
348 | 0 | fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { |
349 | 0 | let state = self.state_as_arc(); |
350 | | |
351 | | // TODO: If possible, push into the current local queue and notify the ticker. |
352 | 0 | move |runnable| { |
353 | 0 | state.queue.push(runnable).unwrap(); |
354 | 0 | state.notify(); |
355 | 0 | } Unexecuted instantiation: <async_executor::Executor>::schedule::{closure#0} Unexecuted instantiation: <async_executor::Executor>::schedule::{closure#0} |
356 | 0 | } |
357 | | |
358 | | /// Returns a pointer to the inner state. |
359 | | #[inline] |
360 | 0 | fn state_ptr(&self) -> *const State { |
361 | | #[cold] |
362 | 0 | fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State { |
363 | 0 | let state = Arc::new(State::new()); |
364 | 0 | // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65 |
365 | 0 | let ptr = Arc::into_raw(state) as *mut State; |
366 | 0 | if let Err(actual) = atomic_ptr.compare_exchange( |
367 | 0 | std::ptr::null_mut(), |
368 | 0 | ptr, |
369 | 0 | Ordering::AcqRel, |
370 | 0 | Ordering::Acquire, |
371 | 0 | ) { |
372 | | // SAFETY: This was just created from Arc::into_raw. |
373 | 0 | drop(unsafe { Arc::from_raw(ptr) }); |
374 | 0 | actual |
375 | | } else { |
376 | 0 | ptr |
377 | | } |
378 | 0 | } |
379 | | |
380 | 0 | let mut ptr = self.state.load(Ordering::Acquire); |
381 | 0 | if ptr.is_null() { |
382 | 0 | ptr = alloc_state(&self.state); |
383 | 0 | } |
384 | 0 | ptr |
385 | 0 | } Unexecuted instantiation: <async_executor::Executor>::state_ptr Unexecuted instantiation: <async_executor::Executor>::state_ptr |
386 | | |
387 | | /// Returns a reference to the inner state. |
388 | | #[inline] |
389 | 0 | fn state(&self) -> &State { |
390 | 0 | // SAFETY: So long as an Executor lives, it's state pointer will always be valid |
391 | 0 | // when accessed through state_ptr. |
392 | 0 | unsafe { &*self.state_ptr() } |
393 | 0 | } Unexecuted instantiation: <async_executor::Executor>::state Unexecuted instantiation: <async_executor::Executor>::state |
394 | | |
395 | | // Clones the inner state Arc |
396 | | #[inline] |
397 | 0 | fn state_as_arc(&self) -> Arc<State> { |
398 | 0 | // SAFETY: So long as an Executor lives, it's state pointer will always be a valid |
399 | 0 | // Arc when accessed through state_ptr. |
400 | 0 | let arc = unsafe { Arc::from_raw(self.state_ptr()) }; |
401 | 0 | let clone = arc.clone(); |
402 | 0 | std::mem::forget(arc); |
403 | 0 | clone |
404 | 0 | } Unexecuted instantiation: <async_executor::Executor>::state_as_arc Unexecuted instantiation: <async_executor::Executor>::state_as_arc |
405 | | } |
406 | | |
407 | | impl Drop for Executor<'_> { |
408 | 0 | fn drop(&mut self) { |
409 | 0 | let ptr = *self.state.get_mut(); |
410 | 0 | if ptr.is_null() { |
411 | 0 | return; |
412 | 0 | } |
413 | 0 |
|
414 | 0 | // SAFETY: As ptr is not null, it was allocated via Arc::new and converted |
415 | 0 | // via Arc::into_raw in state_ptr. |
416 | 0 | let state = unsafe { Arc::from_raw(ptr) }; |
417 | 0 |
|
418 | 0 | let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner()); |
419 | 0 | for w in active.drain() { |
420 | 0 | w.wake(); |
421 | 0 | } |
422 | 0 | drop(active); |
423 | | |
424 | 0 | while state.queue.pop().is_ok() {} |
425 | 0 | } |
426 | | } |
427 | | |
428 | | impl<'a> Default for Executor<'a> { |
429 | 0 | fn default() -> Executor<'a> { |
430 | 0 | Executor::new() |
431 | 0 | } |
432 | | } |
433 | | |
434 | | /// A thread-local executor. |
435 | | /// |
436 | | /// The executor can only be run on the thread that created it. |
437 | | /// |
438 | | /// # Examples |
439 | | /// |
440 | | /// ``` |
441 | | /// use async_executor::LocalExecutor; |
442 | | /// use futures_lite::future; |
443 | | /// |
444 | | /// let local_ex = LocalExecutor::new(); |
445 | | /// |
446 | | /// future::block_on(local_ex.run(async { |
447 | | /// println!("Hello world!"); |
448 | | /// })); |
449 | | /// ``` |
450 | | pub struct LocalExecutor<'a> { |
451 | | /// The inner executor. |
452 | | inner: Executor<'a>, |
453 | | |
454 | | /// Makes the type `!Send` and `!Sync`. |
455 | | _marker: PhantomData<Rc<()>>, |
456 | | } |
457 | | |
458 | | impl UnwindSafe for LocalExecutor<'_> {} |
459 | | impl RefUnwindSafe for LocalExecutor<'_> {} |
460 | | |
461 | | impl fmt::Debug for LocalExecutor<'_> { |
462 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
463 | 0 | debug_executor(&self.inner, "LocalExecutor", f) |
464 | 0 | } |
465 | | } |
466 | | |
467 | | impl<'a> LocalExecutor<'a> { |
468 | | /// Creates a single-threaded executor. |
469 | | /// |
470 | | /// # Examples |
471 | | /// |
472 | | /// ``` |
473 | | /// use async_executor::LocalExecutor; |
474 | | /// |
475 | | /// let local_ex = LocalExecutor::new(); |
476 | | /// ``` |
477 | 0 | pub const fn new() -> LocalExecutor<'a> { |
478 | 0 | LocalExecutor { |
479 | 0 | inner: Executor::new(), |
480 | 0 | _marker: PhantomData, |
481 | 0 | } |
482 | 0 | } |
483 | | |
484 | | /// Returns `true` if there are no unfinished tasks. |
485 | | /// |
486 | | /// # Examples |
487 | | /// |
488 | | /// ``` |
489 | | /// use async_executor::LocalExecutor; |
490 | | /// |
491 | | /// let local_ex = LocalExecutor::new(); |
492 | | /// assert!(local_ex.is_empty()); |
493 | | /// |
494 | | /// let task = local_ex.spawn(async { |
495 | | /// println!("Hello world"); |
496 | | /// }); |
497 | | /// assert!(!local_ex.is_empty()); |
498 | | /// |
499 | | /// assert!(local_ex.try_tick()); |
500 | | /// assert!(local_ex.is_empty()); |
501 | | /// ``` |
502 | 0 | pub fn is_empty(&self) -> bool { |
503 | 0 | self.inner().is_empty() |
504 | 0 | } |
505 | | |
506 | | /// Spawns a task onto the executor. |
507 | | /// |
508 | | /// # Examples |
509 | | /// |
510 | | /// ``` |
511 | | /// use async_executor::LocalExecutor; |
512 | | /// |
513 | | /// let local_ex = LocalExecutor::new(); |
514 | | /// |
515 | | /// let task = local_ex.spawn(async { |
516 | | /// println!("Hello world"); |
517 | | /// }); |
518 | | /// ``` |
519 | 0 | pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { |
520 | 0 | let mut active = self.inner().state().active.lock().unwrap(); |
521 | 0 |
|
522 | 0 | // SAFETY: This executor is not thread safe, so the future and its result |
523 | 0 | // cannot be sent to another thread. |
524 | 0 | unsafe { self.inner().spawn_inner(future, &mut active) } |
525 | 0 | } |
526 | | |
527 | | /// Spawns many tasks onto the executor. |
528 | | /// |
529 | | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
530 | | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
531 | | /// contention. |
532 | | /// |
533 | | /// It is assumed that the iterator provided does not block; blocking iterators can lock up |
534 | | /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the |
535 | | /// mutex is not released, as there are no other threads that can poll this executor. |
536 | | /// |
537 | | /// ## Example |
538 | | /// |
539 | | /// ``` |
540 | | /// use async_executor::LocalExecutor; |
541 | | /// use futures_lite::{stream, prelude::*}; |
542 | | /// use std::future::ready; |
543 | | /// |
544 | | /// # futures_lite::future::block_on(async { |
545 | | /// let mut ex = LocalExecutor::new(); |
546 | | /// |
547 | | /// let futures = [ |
548 | | /// ready(1), |
549 | | /// ready(2), |
550 | | /// ready(3) |
551 | | /// ]; |
552 | | /// |
553 | | /// // Spawn all of the futures onto the executor at once. |
554 | | /// let mut tasks = vec![]; |
555 | | /// ex.spawn_many(futures, &mut tasks); |
556 | | /// |
557 | | /// // Await all of them. |
558 | | /// let results = ex.run(async move { |
559 | | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
560 | | /// }).await; |
561 | | /// assert_eq!(results, [1, 2, 3]); |
562 | | /// # }); |
563 | | /// ``` |
564 | | /// |
565 | | /// [`spawn`]: LocalExecutor::spawn |
566 | | /// [`Executor::spawn_many`]: Executor::spawn_many |
567 | 0 | pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>( |
568 | 0 | &self, |
569 | 0 | futures: impl IntoIterator<Item = F>, |
570 | 0 | handles: &mut impl Extend<Task<F::Output>>, |
571 | 0 | ) { |
572 | 0 | let mut active = self.inner().state().active.lock().unwrap(); |
573 | 0 |
|
574 | 0 | // Convert all of the futures to tasks. |
575 | 0 | let tasks = futures.into_iter().map(|future| { |
576 | 0 | // SAFETY: This executor is not thread safe, so the future and its result |
577 | 0 | // cannot be sent to another thread. |
578 | 0 | unsafe { self.inner().spawn_inner(future, &mut active) } |
579 | 0 |
|
580 | 0 | // As only one thread can spawn or poll tasks at a time, there is no need |
581 | 0 | // to release lock contention here. |
582 | 0 | }); |
583 | 0 |
|
584 | 0 | // Push them to the user's collection. |
585 | 0 | handles.extend(tasks); |
586 | 0 | } |
587 | | |
588 | | /// Attempts to run a task if at least one is scheduled. |
589 | | /// |
590 | | /// Running a scheduled task means simply polling its future once. |
591 | | /// |
592 | | /// # Examples |
593 | | /// |
594 | | /// ``` |
595 | | /// use async_executor::LocalExecutor; |
596 | | /// |
597 | | /// let ex = LocalExecutor::new(); |
598 | | /// assert!(!ex.try_tick()); // no tasks to run |
599 | | /// |
600 | | /// let task = ex.spawn(async { |
601 | | /// println!("Hello world"); |
602 | | /// }); |
603 | | /// assert!(ex.try_tick()); // a task was found |
604 | | /// ``` |
605 | 0 | pub fn try_tick(&self) -> bool { |
606 | 0 | self.inner().try_tick() |
607 | 0 | } |
608 | | |
609 | | /// Runs a single task. |
610 | | /// |
611 | | /// Running a task means simply polling its future once. |
612 | | /// |
613 | | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
614 | | /// |
615 | | /// # Examples |
616 | | /// |
617 | | /// ``` |
618 | | /// use async_executor::LocalExecutor; |
619 | | /// use futures_lite::future; |
620 | | /// |
621 | | /// let ex = LocalExecutor::new(); |
622 | | /// |
623 | | /// let task = ex.spawn(async { |
624 | | /// println!("Hello world"); |
625 | | /// }); |
626 | | /// future::block_on(ex.tick()); // runs the task |
627 | | /// ``` |
628 | 0 | pub async fn tick(&self) { |
629 | 0 | self.inner().tick().await |
630 | 0 | } |
631 | | |
632 | | /// Runs the executor until the given future completes. |
633 | | /// |
634 | | /// # Examples |
635 | | /// |
636 | | /// ``` |
637 | | /// use async_executor::LocalExecutor; |
638 | | /// use futures_lite::future; |
639 | | /// |
640 | | /// let local_ex = LocalExecutor::new(); |
641 | | /// |
642 | | /// let task = local_ex.spawn(async { 1 + 2 }); |
643 | | /// let res = future::block_on(local_ex.run(async { task.await * 2 })); |
644 | | /// |
645 | | /// assert_eq!(res, 6); |
646 | | /// ``` |
647 | 0 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
648 | 0 | self.inner().run(future).await |
649 | 0 | } |
650 | | |
651 | | /// Returns a reference to the inner executor. |
652 | 0 | fn inner(&self) -> &Executor<'a> { |
653 | 0 | &self.inner |
654 | 0 | } |
655 | | } |
656 | | |
657 | | impl<'a> Default for LocalExecutor<'a> { |
658 | 0 | fn default() -> LocalExecutor<'a> { |
659 | 0 | LocalExecutor::new() |
660 | 0 | } |
661 | | } |
662 | | |
663 | | /// The state of a executor. |
664 | | struct State { |
665 | | /// The global queue. |
666 | | queue: ConcurrentQueue<Runnable>, |
667 | | |
668 | | /// Local queues created by runners. |
669 | | local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>, |
670 | | |
671 | | /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. |
672 | | notified: AtomicBool, |
673 | | |
674 | | /// A list of sleeping tickers. |
675 | | sleepers: Mutex<Sleepers>, |
676 | | |
677 | | /// Currently active tasks. |
678 | | active: Mutex<Slab<Waker>>, |
679 | | } |
680 | | |
681 | | impl State { |
682 | | /// Creates state for a new executor. |
683 | 0 | const fn new() -> State { |
684 | 0 | State { |
685 | 0 | queue: ConcurrentQueue::unbounded(), |
686 | 0 | local_queues: RwLock::new(Vec::new()), |
687 | 0 | notified: AtomicBool::new(true), |
688 | 0 | sleepers: Mutex::new(Sleepers { |
689 | 0 | count: 0, |
690 | 0 | wakers: Vec::new(), |
691 | 0 | free_ids: Vec::new(), |
692 | 0 | }), |
693 | 0 | active: Mutex::new(Slab::new()), |
694 | 0 | } |
695 | 0 | } |
696 | | |
697 | | /// Notifies a sleeping ticker. |
698 | | #[inline] |
699 | 0 | fn notify(&self) { |
700 | 0 | if self |
701 | 0 | .notified |
702 | 0 | .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) |
703 | 0 | .is_ok() |
704 | | { |
705 | 0 | let waker = self.sleepers.lock().unwrap().notify(); |
706 | 0 | if let Some(w) = waker { |
707 | 0 | w.wake(); |
708 | 0 | } |
709 | 0 | } |
710 | 0 | } Unexecuted instantiation: <async_executor::State>::notify Unexecuted instantiation: <async_executor::State>::notify |
711 | | |
712 | 0 | pub(crate) fn try_tick(&self) -> bool { |
713 | 0 | match self.queue.pop() { |
714 | 0 | Err(_) => false, |
715 | 0 | Ok(runnable) => { |
716 | 0 | // Notify another ticker now to pick up where this ticker left off, just in case |
717 | 0 | // running the task takes a long time. |
718 | 0 | self.notify(); |
719 | 0 |
|
720 | 0 | // Run the task. |
721 | 0 | runnable.run(); |
722 | 0 | true |
723 | | } |
724 | | } |
725 | 0 | } |
726 | | |
727 | 0 | pub(crate) async fn tick(&self) { |
728 | 0 | let runnable = Ticker::new(self).runnable().await; |
729 | 0 | runnable.run(); |
730 | 0 | } |
731 | | |
732 | 0 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>> Unexecuted instantiation: <async_executor::State>::run::<_, _> |
733 | 0 | let mut runner = Runner::new(self); |
734 | 0 | let mut rng = fastrand::Rng::new(); |
735 | 0 |
|
736 | 0 | // A future that runs tasks forever. |
737 | 0 | let run_forever = async { |
738 | | loop { |
739 | 0 | for _ in 0..200 { |
740 | 0 | let runnable = runner.runnable(&mut rng).await; |
741 | 0 | runnable.run(); |
742 | | } |
743 | 0 | future::yield_now().await; Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::State>::run::<_, _>::{closure#0}::{closure#0} |
744 | | } |
745 | | }; |
746 | | |
747 | | // Run `future` and `run_forever` concurrently until `future` completes. |
748 | 0 | future.or(run_forever).await |
749 | 0 | } Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0} Unexecuted instantiation: <async_executor::State>::run::<_, _>::{closure#0} |
750 | | } |
751 | | |
752 | | /// A list of sleeping tickers. |
753 | | struct Sleepers { |
754 | | /// Number of sleeping tickers (both notified and unnotified). |
755 | | count: usize, |
756 | | |
757 | | /// IDs and wakers of sleeping unnotified tickers. |
758 | | /// |
759 | | /// A sleeping ticker is notified when its waker is missing from this list. |
760 | | wakers: Vec<(usize, Waker)>, |
761 | | |
762 | | /// Reclaimed IDs. |
763 | | free_ids: Vec<usize>, |
764 | | } |
765 | | |
766 | | impl Sleepers { |
767 | | /// Inserts a new sleeping ticker. |
768 | 0 | fn insert(&mut self, waker: &Waker) -> usize { |
769 | 0 | let id = match self.free_ids.pop() { |
770 | 0 | Some(id) => id, |
771 | 0 | None => self.count + 1, |
772 | | }; |
773 | 0 | self.count += 1; |
774 | 0 | self.wakers.push((id, waker.clone())); |
775 | 0 | id |
776 | 0 | } |
777 | | |
778 | | /// Re-inserts a sleeping ticker's waker if it was notified. |
779 | | /// |
780 | | /// Returns `true` if the ticker was notified. |
781 | 0 | fn update(&mut self, id: usize, waker: &Waker) -> bool { |
782 | 0 | for item in &mut self.wakers { |
783 | 0 | if item.0 == id { |
784 | 0 | item.1.clone_from(waker); |
785 | 0 | return false; |
786 | 0 | } |
787 | | } |
788 | | |
789 | 0 | self.wakers.push((id, waker.clone())); |
790 | 0 | true |
791 | 0 | } |
792 | | |
793 | | /// Removes a previously inserted sleeping ticker. |
794 | | /// |
795 | | /// Returns `true` if the ticker was notified. |
796 | 0 | fn remove(&mut self, id: usize) -> bool { |
797 | 0 | self.count -= 1; |
798 | 0 | self.free_ids.push(id); |
799 | | |
800 | 0 | for i in (0..self.wakers.len()).rev() { |
801 | 0 | if self.wakers[i].0 == id { |
802 | 0 | self.wakers.remove(i); |
803 | 0 | return false; |
804 | 0 | } |
805 | | } |
806 | 0 | true |
807 | 0 | } |
808 | | |
809 | | /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. |
810 | 0 | fn is_notified(&self) -> bool { |
811 | 0 | self.count == 0 || self.count > self.wakers.len() |
812 | 0 | } |
813 | | |
814 | | /// Returns notification waker for a sleeping ticker. |
815 | | /// |
816 | | /// If a ticker was notified already or there are no tickers, `None` will be returned. |
817 | 0 | fn notify(&mut self) -> Option<Waker> { |
818 | 0 | if self.wakers.len() == self.count { |
819 | 0 | self.wakers.pop().map(|item| item.1) |
820 | | } else { |
821 | 0 | None |
822 | | } |
823 | 0 | } |
824 | | } |
825 | | |
826 | | /// Runs task one by one. |
827 | | struct Ticker<'a> { |
828 | | /// The executor state. |
829 | | state: &'a State, |
830 | | |
831 | | /// Set to a non-zero sleeper ID when in sleeping state. |
832 | | /// |
833 | | /// States a ticker can be in: |
834 | | /// 1) Woken. |
835 | | /// 2a) Sleeping and unnotified. |
836 | | /// 2b) Sleeping and notified. |
837 | | sleeping: usize, |
838 | | } |
839 | | |
840 | | impl Ticker<'_> { |
841 | | /// Creates a ticker. |
842 | 0 | fn new(state: &State) -> Ticker<'_> { |
843 | 0 | Ticker { state, sleeping: 0 } |
844 | 0 | } |
845 | | |
846 | | /// Moves the ticker into sleeping and unnotified state. |
847 | | /// |
848 | | /// Returns `false` if the ticker was already sleeping and unnotified. |
849 | 0 | fn sleep(&mut self, waker: &Waker) -> bool { |
850 | 0 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
851 | 0 |
|
852 | 0 | match self.sleeping { |
853 | | // Move to sleeping state. |
854 | 0 | 0 => { |
855 | 0 | self.sleeping = sleepers.insert(waker); |
856 | 0 | } |
857 | | |
858 | | // Already sleeping, check if notified. |
859 | 0 | id => { |
860 | 0 | if !sleepers.update(id, waker) { |
861 | 0 | return false; |
862 | 0 | } |
863 | | } |
864 | | } |
865 | | |
866 | 0 | self.state |
867 | 0 | .notified |
868 | 0 | .store(sleepers.is_notified(), Ordering::Release); |
869 | 0 |
|
870 | 0 | true |
871 | 0 | } |
872 | | |
873 | | /// Moves the ticker into woken state. |
874 | 0 | fn wake(&mut self) { |
875 | 0 | if self.sleeping != 0 { |
876 | 0 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
877 | 0 | sleepers.remove(self.sleeping); |
878 | 0 |
|
879 | 0 | self.state |
880 | 0 | .notified |
881 | 0 | .store(sleepers.is_notified(), Ordering::Release); |
882 | 0 | } |
883 | 0 | self.sleeping = 0; |
884 | 0 | } |
885 | | |
886 | | /// Waits for the next runnable task to run. |
887 | 0 | async fn runnable(&mut self) -> Runnable { |
888 | 0 | self.runnable_with(|| self.state.queue.pop().ok()).await |
889 | 0 | } |
890 | | |
891 | | /// Waits for the next runnable task to run, given a function that searches for a task. |
892 | 0 | async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable { Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}> Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_> |
893 | 0 | future::poll_fn(|cx| { |
894 | | loop { |
895 | 0 | match search() { |
896 | | None => { |
897 | | // Move to sleeping and unnotified state. |
898 | 0 | if !self.sleep(cx.waker()) { |
899 | | // If already sleeping and unnotified, return. |
900 | 0 | return Poll::Pending; |
901 | 0 | } |
902 | | } |
903 | 0 | Some(r) => { |
904 | 0 | // Wake up. |
905 | 0 | self.wake(); |
906 | 0 |
|
907 | 0 | // Notify another ticker now to pick up where this ticker left off, just in |
908 | 0 | // case running the task takes a long time. |
909 | 0 | self.state.notify(); |
910 | 0 |
|
911 | 0 | return Poll::Ready(r); |
912 | | } |
913 | | } |
914 | | } |
915 | 0 | }) Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}>::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_>::{closure#0}::{closure#0} |
916 | 0 | .await |
917 | 0 | } Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}>::{closure#0} Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_>::{closure#0} |
918 | | } |
919 | | |
920 | | impl Drop for Ticker<'_> { |
921 | 0 | fn drop(&mut self) { |
922 | 0 | // If this ticker is in sleeping state, it must be removed from the sleepers list. |
923 | 0 | if self.sleeping != 0 { |
924 | 0 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
925 | 0 | let notified = sleepers.remove(self.sleeping); |
926 | 0 |
|
927 | 0 | self.state |
928 | 0 | .notified |
929 | 0 | .store(sleepers.is_notified(), Ordering::Release); |
930 | 0 |
|
931 | 0 | // If this ticker was notified, then notify another ticker. |
932 | 0 | if notified { |
933 | 0 | drop(sleepers); |
934 | 0 | self.state.notify(); |
935 | 0 | } |
936 | 0 | } |
937 | 0 | } |
938 | | } |
939 | | |
940 | | /// A worker in a work-stealing executor. |
941 | | /// |
942 | | /// This is just a ticker that also has an associated local queue for improved cache locality. |
943 | | struct Runner<'a> { |
944 | | /// The executor state. |
945 | | state: &'a State, |
946 | | |
947 | | /// Inner ticker. |
948 | | ticker: Ticker<'a>, |
949 | | |
950 | | /// The local queue. |
951 | | local: Arc<ConcurrentQueue<Runnable>>, |
952 | | |
953 | | /// Bumped every time a runnable task is found. |
954 | | ticks: usize, |
955 | | } |
956 | | |
957 | | impl Runner<'_> { |
958 | | /// Creates a runner and registers it in the executor state. |
959 | 0 | fn new(state: &State) -> Runner<'_> { |
960 | 0 | let runner = Runner { |
961 | 0 | state, |
962 | 0 | ticker: Ticker::new(state), |
963 | 0 | local: Arc::new(ConcurrentQueue::bounded(512)), |
964 | 0 | ticks: 0, |
965 | 0 | }; |
966 | 0 | state |
967 | 0 | .local_queues |
968 | 0 | .write() |
969 | 0 | .unwrap() |
970 | 0 | .push(runner.local.clone()); |
971 | 0 | runner |
972 | 0 | } |
973 | | |
974 | | /// Waits for the next runnable task to run. |
975 | 0 | async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { |
976 | 0 | let runnable = self |
977 | 0 | .ticker |
978 | 0 | .runnable_with(|| { |
979 | | // Try the local queue. |
980 | 0 | if let Ok(r) = self.local.pop() { |
981 | 0 | return Some(r); |
982 | 0 | } |
983 | | |
984 | | // Try stealing from the global queue. |
985 | 0 | if let Ok(r) = self.state.queue.pop() { |
986 | 0 | steal(&self.state.queue, &self.local); |
987 | 0 | return Some(r); |
988 | 0 | } |
989 | 0 |
|
990 | 0 | // Try stealing from other runners. |
991 | 0 | let local_queues = self.state.local_queues.read().unwrap(); |
992 | 0 |
|
993 | 0 | // Pick a random starting point in the iterator list and rotate the list. |
994 | 0 | let n = local_queues.len(); |
995 | 0 | let start = rng.usize(..n); |
996 | 0 | let iter = local_queues |
997 | 0 | .iter() |
998 | 0 | .chain(local_queues.iter()) |
999 | 0 | .skip(start) |
1000 | 0 | .take(n); |
1001 | 0 |
|
1002 | 0 | // Remove this runner's local queue. |
1003 | 0 | let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}::{closure#0} |
1004 | | |
1005 | | // Try stealing from each local queue in the list. |
1006 | 0 | for local in iter { |
1007 | 0 | steal(local, &self.local); |
1008 | 0 | if let Ok(r) = self.local.pop() { |
1009 | 0 | return Some(r); |
1010 | 0 | } |
1011 | | } |
1012 | | |
1013 | 0 | None |
1014 | 0 | }) Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0} Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0} |
1015 | 0 | .await; |
1016 | | |
1017 | | // Bump the tick counter. |
1018 | 0 | self.ticks = self.ticks.wrapping_add(1); |
1019 | 0 |
|
1020 | 0 | if self.ticks % 64 == 0 { |
1021 | 0 | // Steal tasks from the global queue to ensure fair task scheduling. |
1022 | 0 | steal(&self.state.queue, &self.local); |
1023 | 0 | } |
1024 | | |
1025 | 0 | runnable |
1026 | 0 | } Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0} Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0} |
1027 | | } |
1028 | | |
1029 | | impl Drop for Runner<'_> { |
1030 | 0 | fn drop(&mut self) { |
1031 | 0 | // Remove the local queue. |
1032 | 0 | self.state |
1033 | 0 | .local_queues |
1034 | 0 | .write() |
1035 | 0 | .unwrap() |
1036 | 0 | .retain(|local| !Arc::ptr_eq(local, &self.local)); |
1037 | | |
1038 | | // Re-schedule remaining tasks in the local queue. |
1039 | 0 | while let Ok(r) = self.local.pop() { |
1040 | 0 | r.schedule(); |
1041 | 0 | } |
1042 | 0 | } |
1043 | | } |
1044 | | |
1045 | | /// Steals some items from one queue into another. |
1046 | 0 | fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) { |
1047 | 0 | // Half of `src`'s length rounded up. |
1048 | 0 | let mut count = (src.len() + 1) / 2; |
1049 | 0 |
|
1050 | 0 | if count > 0 { |
1051 | | // Don't steal more than fits into the queue. |
1052 | 0 | if let Some(cap) = dest.capacity() { |
1053 | 0 | count = count.min(cap - dest.len()); |
1054 | 0 | } |
1055 | | |
1056 | | // Steal tasks. |
1057 | 0 | for _ in 0..count { |
1058 | 0 | if let Ok(t) = src.pop() { |
1059 | 0 | assert!(dest.push(t).is_ok()); |
1060 | | } else { |
1061 | 0 | break; |
1062 | | } |
1063 | | } |
1064 | 0 | } |
1065 | 0 | } Unexecuted instantiation: async_executor::steal::<async_task::runnable::Runnable> Unexecuted instantiation: async_executor::steal::<_> |
1066 | | |
1067 | | /// Debug implementation for `Executor` and `LocalExecutor`. |
1068 | 0 | fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1069 | 0 | // Get a reference to the state. |
1070 | 0 | let ptr = executor.state.load(Ordering::Acquire); |
1071 | 0 | if ptr.is_null() { |
1072 | | // The executor has not been initialized. |
1073 | | struct Uninitialized; |
1074 | | |
1075 | | impl fmt::Debug for Uninitialized { |
1076 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1077 | 0 | f.write_str("<uninitialized>") |
1078 | 0 | } |
1079 | | } |
1080 | | |
1081 | 0 | return f.debug_tuple(name).field(&Uninitialized).finish(); |
1082 | 0 | } |
1083 | 0 |
|
1084 | 0 | // SAFETY: If the state pointer is not null, it must have been |
1085 | 0 | // allocated properly by Arc::new and converted via Arc::into_raw |
1086 | 0 | // in state_ptr. |
1087 | 0 | let state = unsafe { &*ptr }; |
1088 | 0 |
|
1089 | 0 | debug_state(state, name, f) |
1090 | 0 | } |
1091 | | |
1092 | | /// Debug implementation for `Executor` and `LocalExecutor`. |
1093 | 0 | fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1094 | | /// Debug wrapper for the number of active tasks. |
1095 | | struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>); |
1096 | | |
1097 | | impl fmt::Debug for ActiveTasks<'_> { |
1098 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1099 | 0 | match self.0.try_lock() { |
1100 | 0 | Ok(lock) => fmt::Debug::fmt(&lock.len(), f), |
1101 | 0 | Err(TryLockError::WouldBlock) => f.write_str("<locked>"), |
1102 | 0 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"), |
1103 | | } |
1104 | 0 | } |
1105 | | } |
1106 | | |
1107 | | /// Debug wrapper for the local runners. |
1108 | | struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>); |
1109 | | |
1110 | | impl fmt::Debug for LocalRunners<'_> { |
1111 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1112 | 0 | match self.0.try_read() { |
1113 | 0 | Ok(lock) => f |
1114 | 0 | .debug_list() |
1115 | 0 | .entries(lock.iter().map(|queue| queue.len())) |
1116 | 0 | .finish(), |
1117 | 0 | Err(TryLockError::WouldBlock) => f.write_str("<locked>"), |
1118 | 0 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"), |
1119 | | } |
1120 | 0 | } |
1121 | | } |
1122 | | |
1123 | | /// Debug wrapper for the sleepers. |
1124 | | struct SleepCount<'a>(&'a Mutex<Sleepers>); |
1125 | | |
1126 | | impl fmt::Debug for SleepCount<'_> { |
1127 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1128 | 0 | match self.0.try_lock() { |
1129 | 0 | Ok(lock) => fmt::Debug::fmt(&lock.count, f), |
1130 | 0 | Err(TryLockError::WouldBlock) => f.write_str("<locked>"), |
1131 | 0 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"), |
1132 | | } |
1133 | 0 | } |
1134 | | } |
1135 | | |
1136 | 0 | f.debug_struct(name) |
1137 | 0 | .field("active", &ActiveTasks(&state.active)) |
1138 | 0 | .field("global_tasks", &state.queue.len()) |
1139 | 0 | .field("local_runners", &LocalRunners(&state.local_queues)) |
1140 | 0 | .field("sleepers", &SleepCount(&state.sleepers)) |
1141 | 0 | .finish() |
1142 | 0 | } |
1143 | | |
1144 | | /// Runs a closure when dropped. |
1145 | | struct CallOnDrop<F: FnMut()>(F); |
1146 | | |
1147 | | impl<F: FnMut()> Drop for CallOnDrop<F> { |
1148 | 0 | fn drop(&mut self) { |
1149 | 0 | (self.0)(); |
1150 | 0 | } Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop Unexecuted instantiation: <async_executor::CallOnDrop<_> as core::ops::drop::Drop>::drop |
1151 | | } |
1152 | | |
1153 | 0 | fn _ensure_send_and_sync() { |
1154 | | use futures_lite::future::pending; |
1155 | | |
1156 | 0 | fn is_send<T: Send>(_: T) {} |
1157 | 0 | fn is_sync<T: Sync>(_: T) {} |
1158 | 0 | fn is_static<T: 'static>(_: T) {} |
1159 | | |
1160 | 0 | is_send::<Executor<'_>>(Executor::new()); |
1161 | 0 | is_sync::<Executor<'_>>(Executor::new()); |
1162 | 0 |
|
1163 | 0 | let ex = Executor::new(); |
1164 | 0 | is_send(ex.run(pending::<()>())); |
1165 | 0 | is_sync(ex.run(pending::<()>())); |
1166 | 0 | is_send(ex.tick()); |
1167 | 0 | is_sync(ex.tick()); |
1168 | 0 | is_send(ex.schedule()); |
1169 | 0 | is_sync(ex.schedule()); |
1170 | 0 | is_static(ex.schedule()); |
1171 | | |
1172 | | /// ```compile_fail |
1173 | | /// use async_executor::LocalExecutor; |
1174 | | /// use futures_lite::future::pending; |
1175 | | /// |
1176 | | /// fn is_send<T: Send>(_: T) {} |
1177 | | /// fn is_sync<T: Sync>(_: T) {} |
1178 | | /// |
1179 | | /// is_send::<LocalExecutor<'_>>(LocalExecutor::new()); |
1180 | | /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new()); |
1181 | | /// |
1182 | | /// let ex = LocalExecutor::new(); |
1183 | | /// is_send(ex.run(pending::<()>())); |
1184 | | /// is_sync(ex.run(pending::<()>())); |
1185 | | /// is_send(ex.tick()); |
1186 | | /// is_sync(ex.tick()); |
1187 | | /// ``` |
1188 | 0 | fn _negative_test() {} |
1189 | 0 | } |