Coverage Report

Created: 2025-02-21 07:11

/rust/registry/src/index.crates.io-6f17d22bba15001f/async-executor-1.13.1/src/lib.rs
Line
Count
Source (jump to first uncovered line)
1
//! Async executors.
2
//!
3
//! This crate provides two reference executors that trade performance for
4
//! functionality. They should be considered reference executors that are "good
5
//! enough" for most use cases. For more specialized use cases, consider writing
6
//! your own executor on top of [`async-task`].
7
//!
8
//! [`async-task`]: https://crates.io/crates/async-task
9
//!
10
//! # Examples
11
//!
12
//! ```
13
//! use async_executor::Executor;
14
//! use futures_lite::future;
15
//!
16
//! // Create a new executor.
17
//! let ex = Executor::new();
18
//!
19
//! // Spawn a task.
20
//! let task = ex.spawn(async {
21
//!     println!("Hello world");
22
//! });
23
//!
24
//! // Run the executor until the task completes.
25
//! future::block_on(ex.run(task));
26
//! ```
27
28
#![warn(
29
    missing_docs,
30
    missing_debug_implementations,
31
    rust_2018_idioms,
32
    clippy::undocumented_unsafe_blocks
33
)]
34
#![doc(
35
    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36
)]
37
#![doc(
38
    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39
)]
40
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41
42
use std::fmt;
43
use std::marker::PhantomData;
44
use std::panic::{RefUnwindSafe, UnwindSafe};
45
use std::rc::Rc;
46
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
47
use std::sync::{Arc, Mutex, RwLock, TryLockError};
48
use std::task::{Poll, Waker};
49
50
use async_task::{Builder, Runnable};
51
use concurrent_queue::ConcurrentQueue;
52
use futures_lite::{future, prelude::*};
53
use slab::Slab;
54
55
#[cfg(feature = "static")]
56
mod static_executors;
57
58
#[doc(no_inline)]
59
pub use async_task::{FallibleTask, Task};
60
#[cfg(feature = "static")]
61
#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
62
pub use static_executors::*;
63
64
/// An async executor.
65
///
66
/// # Examples
67
///
68
/// A multi-threaded executor:
69
///
70
/// ```
71
/// use async_channel::unbounded;
72
/// use async_executor::Executor;
73
/// use easy_parallel::Parallel;
74
/// use futures_lite::future;
75
///
76
/// let ex = Executor::new();
77
/// let (signal, shutdown) = unbounded::<()>();
78
///
79
/// Parallel::new()
80
///     // Run four executor threads.
81
///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
82
///     // Run the main future on the current thread.
83
///     .finish(|| future::block_on(async {
84
///         println!("Hello world!");
85
///         drop(signal);
86
///     }));
87
/// ```
88
pub struct Executor<'a> {
89
    /// The executor state.
90
    state: AtomicPtr<State>,
91
92
    /// Makes the `'a` lifetime invariant.
93
    _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
94
}
95
96
// SAFETY: Executor stores no thread local state that can be accessed via other thread.
97
unsafe impl Send for Executor<'_> {}
98
// SAFETY: Executor internally synchronizes all of it's operations internally.
99
unsafe impl Sync for Executor<'_> {}
100
101
impl UnwindSafe for Executor<'_> {}
102
impl RefUnwindSafe for Executor<'_> {}
103
104
impl fmt::Debug for Executor<'_> {
105
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106
0
        debug_executor(self, "Executor", f)
107
0
    }
108
}
109
110
impl<'a> Executor<'a> {
111
    /// Creates a new executor.
112
    ///
113
    /// # Examples
114
    ///
115
    /// ```
116
    /// use async_executor::Executor;
117
    ///
118
    /// let ex = Executor::new();
119
    /// ```
120
0
    pub const fn new() -> Executor<'a> {
121
0
        Executor {
122
0
            state: AtomicPtr::new(std::ptr::null_mut()),
123
0
            _marker: PhantomData,
124
0
        }
125
0
    }
126
127
    /// Returns `true` if there are no unfinished tasks.
128
    ///
129
    /// # Examples
130
    ///
131
    /// ```
132
    /// use async_executor::Executor;
133
    ///
134
    /// let ex = Executor::new();
135
    /// assert!(ex.is_empty());
136
    ///
137
    /// let task = ex.spawn(async {
138
    ///     println!("Hello world");
139
    /// });
140
    /// assert!(!ex.is_empty());
141
    ///
142
    /// assert!(ex.try_tick());
143
    /// assert!(ex.is_empty());
144
    /// ```
145
0
    pub fn is_empty(&self) -> bool {
146
0
        self.state().active.lock().unwrap().is_empty()
147
0
    }
148
149
    /// Spawns a task onto the executor.
150
    ///
151
    /// # Examples
152
    ///
153
    /// ```
154
    /// use async_executor::Executor;
155
    ///
156
    /// let ex = Executor::new();
157
    ///
158
    /// let task = ex.spawn(async {
159
    ///     println!("Hello world");
160
    /// });
161
    /// ```
162
0
    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
163
0
        let mut active = self.state().active.lock().unwrap();
164
0
165
0
        // SAFETY: `T` and the future are `Send`.
166
0
        unsafe { self.spawn_inner(future, &mut active) }
167
0
    }
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn::<_, _>
168
169
    /// Spawns many tasks onto the executor.
170
    ///
171
    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
172
    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
173
    /// contention.
174
    ///
175
    /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
176
    /// prevent runner thread starvation. It is assumed that the iterator provided does not
177
    /// block; blocking iterators can lock up the internal mutex and therefore the entire
178
    /// executor.
179
    ///
180
    /// ## Example
181
    ///
182
    /// ```
183
    /// use async_executor::Executor;
184
    /// use futures_lite::{stream, prelude::*};
185
    /// use std::future::ready;
186
    ///
187
    /// # futures_lite::future::block_on(async {
188
    /// let mut ex = Executor::new();
189
    ///
190
    /// let futures = [
191
    ///     ready(1),
192
    ///     ready(2),
193
    ///     ready(3)
194
    /// ];
195
    ///
196
    /// // Spawn all of the futures onto the executor at once.
197
    /// let mut tasks = vec![];
198
    /// ex.spawn_many(futures, &mut tasks);
199
    ///
200
    /// // Await all of them.
201
    /// let results = ex.run(async move {
202
    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
203
    /// }).await;
204
    /// assert_eq!(results, [1, 2, 3]);
205
    /// # });
206
    /// ```
207
    ///
208
    /// [`spawn`]: Executor::spawn
209
0
    pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
210
0
        &self,
211
0
        futures: impl IntoIterator<Item = F>,
212
0
        handles: &mut impl Extend<Task<F::Output>>,
213
0
    ) {
214
0
        let mut active = Some(self.state().active.lock().unwrap());
215
0
216
0
        // Convert the futures into tasks.
217
0
        let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
218
0
            // SAFETY: `T` and the future are `Send`.
219
0
            let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
220
0
221
0
            // Yield the lock every once in a while to ease contention.
222
0
            if i.wrapping_sub(1) % 500 == 0 {
223
0
                drop(active.take());
224
0
                active = Some(self.state().active.lock().unwrap());
225
0
            }
226
227
0
            task
228
0
        });
229
0
230
0
        // Push the tasks to the user's collection.
231
0
        handles.extend(tasks);
232
0
    }
233
234
    /// Spawn a future while holding the inner lock.
235
    ///
236
    /// # Safety
237
    ///
238
    /// If this is an `Executor`, `F` and `T` must be `Send`.
239
0
    unsafe fn spawn_inner<T: 'a>(
240
0
        &self,
241
0
        future: impl Future<Output = T> + 'a,
242
0
        active: &mut Slab<Waker>,
243
0
    ) -> Task<T> {
244
0
        // Remove the task from the set of active tasks when the future finishes.
245
0
        let entry = active.vacant_entry();
246
0
        let index = entry.key();
247
0
        let state = self.state_as_arc();
248
0
        let future = async move {
249
0
            let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#0}::{closure#0}
250
0
            future.await
251
0
        };
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#0}
252
253
        // Create the task and register it in the set of active tasks.
254
        //
255
        // SAFETY:
256
        //
257
        // If `future` is not `Send`, this must be a `LocalExecutor` as per this
258
        // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
259
        // `try_tick`, `tick` and `run` can only be called from the origin
260
        // thread of the `LocalExecutor`. Similarly, `spawn` can only  be called
261
        // from the origin thread, ensuring that `future` and the executor share
262
        // the same origin thread. The `Runnable` can be scheduled from other
263
        // threads, but because of the above `Runnable` can only be called or
264
        // dropped on the origin thread.
265
        //
266
        // `future` is not `'static`, but we make sure that the `Runnable` does
267
        // not outlive `'a`. When the executor is dropped, the `active` field is
268
        // drained and all of the `Waker`s are woken. Then, the queue inside of
269
        // the `Executor` is drained of all of its runnables. This ensures that
270
        // runnables are dropped and this precondition is satisfied.
271
        //
272
        // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273
        // Therefore we do not need to worry about what is done with the
274
        // `Waker`.
275
0
        let (runnable, task) = Builder::new()
276
0
            .propagate_panic(true)
277
0
            .spawn_unchecked(|()| future, self.schedule());
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#1}
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>::{closure#1}
278
0
        entry.insert(runnable.waker());
279
0
280
0
        runnable.schedule();
281
0
        task
282
0
    }
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Executor>::spawn_inner::<_, _>
283
284
    /// Attempts to run a task if at least one is scheduled.
285
    ///
286
    /// Running a scheduled task means simply polling its future once.
287
    ///
288
    /// # Examples
289
    ///
290
    /// ```
291
    /// use async_executor::Executor;
292
    ///
293
    /// let ex = Executor::new();
294
    /// assert!(!ex.try_tick()); // no tasks to run
295
    ///
296
    /// let task = ex.spawn(async {
297
    ///     println!("Hello world");
298
    /// });
299
    /// assert!(ex.try_tick()); // a task was found
300
    /// ```
301
0
    pub fn try_tick(&self) -> bool {
302
0
        self.state().try_tick()
303
0
    }
304
305
    /// Runs a single task.
306
    ///
307
    /// Running a task means simply polling its future once.
308
    ///
309
    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
310
    ///
311
    /// # Examples
312
    ///
313
    /// ```
314
    /// use async_executor::Executor;
315
    /// use futures_lite::future;
316
    ///
317
    /// let ex = Executor::new();
318
    ///
319
    /// let task = ex.spawn(async {
320
    ///     println!("Hello world");
321
    /// });
322
    /// future::block_on(ex.tick()); // runs the task
323
    /// ```
324
0
    pub async fn tick(&self) {
325
0
        self.state().tick().await;
326
0
    }
327
328
    /// Runs the executor until the given future completes.
329
    ///
330
    /// # Examples
331
    ///
332
    /// ```
333
    /// use async_executor::Executor;
334
    /// use futures_lite::future;
335
    ///
336
    /// let ex = Executor::new();
337
    ///
338
    /// let task = ex.spawn(async { 1 + 2 });
339
    /// let res = future::block_on(ex.run(async { task.await * 2 }));
340
    ///
341
    /// assert_eq!(res, 6);
342
    /// ```
343
0
    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
Unexecuted instantiation: <async_executor::Executor>::run::<(), futures_util::future::pending::Pending<()>>
Unexecuted instantiation: <async_executor::Executor>::run::<_, _>
344
0
        self.state().run(future).await
345
0
    }
Unexecuted instantiation: <async_executor::Executor>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::run::<_, _>::{closure#0}
346
347
    /// Returns a function that schedules a runnable task when it gets woken up.
348
0
    fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349
0
        let state = self.state_as_arc();
350
351
        // TODO: If possible, push into the current local queue and notify the ticker.
352
0
        move |runnable| {
353
0
            state.queue.push(runnable).unwrap();
354
0
            state.notify();
355
0
        }
Unexecuted instantiation: <async_executor::Executor>::schedule::{closure#0}
Unexecuted instantiation: <async_executor::Executor>::schedule::{closure#0}
356
0
    }
357
358
    /// Returns a pointer to the inner state.
359
    #[inline]
360
0
    fn state_ptr(&self) -> *const State {
361
        #[cold]
362
0
        fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
363
0
            let state = Arc::new(State::new());
364
0
            // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
365
0
            let ptr = Arc::into_raw(state) as *mut State;
366
0
            if let Err(actual) = atomic_ptr.compare_exchange(
367
0
                std::ptr::null_mut(),
368
0
                ptr,
369
0
                Ordering::AcqRel,
370
0
                Ordering::Acquire,
371
0
            ) {
372
                // SAFETY: This was just created from Arc::into_raw.
373
0
                drop(unsafe { Arc::from_raw(ptr) });
374
0
                actual
375
            } else {
376
0
                ptr
377
            }
378
0
        }
379
380
0
        let mut ptr = self.state.load(Ordering::Acquire);
381
0
        if ptr.is_null() {
382
0
            ptr = alloc_state(&self.state);
383
0
        }
384
0
        ptr
385
0
    }
Unexecuted instantiation: <async_executor::Executor>::state_ptr
Unexecuted instantiation: <async_executor::Executor>::state_ptr
386
387
    /// Returns a reference to the inner state.
388
    #[inline]
389
0
    fn state(&self) -> &State {
390
0
        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
391
0
        // when accessed through state_ptr.
392
0
        unsafe { &*self.state_ptr() }
393
0
    }
Unexecuted instantiation: <async_executor::Executor>::state
Unexecuted instantiation: <async_executor::Executor>::state
394
395
    // Clones the inner state Arc
396
    #[inline]
397
0
    fn state_as_arc(&self) -> Arc<State> {
398
0
        // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
399
0
        // Arc when accessed through state_ptr.
400
0
        let arc = unsafe { Arc::from_raw(self.state_ptr()) };
401
0
        let clone = arc.clone();
402
0
        std::mem::forget(arc);
403
0
        clone
404
0
    }
Unexecuted instantiation: <async_executor::Executor>::state_as_arc
Unexecuted instantiation: <async_executor::Executor>::state_as_arc
405
}
406
407
impl Drop for Executor<'_> {
408
0
    fn drop(&mut self) {
409
0
        let ptr = *self.state.get_mut();
410
0
        if ptr.is_null() {
411
0
            return;
412
0
        }
413
0
414
0
        // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
415
0
        // via Arc::into_raw in state_ptr.
416
0
        let state = unsafe { Arc::from_raw(ptr) };
417
0
418
0
        let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
419
0
        for w in active.drain() {
420
0
            w.wake();
421
0
        }
422
0
        drop(active);
423
424
0
        while state.queue.pop().is_ok() {}
425
0
    }
426
}
427
428
impl<'a> Default for Executor<'a> {
429
0
    fn default() -> Executor<'a> {
430
0
        Executor::new()
431
0
    }
432
}
433
434
/// A thread-local executor.
435
///
436
/// The executor can only be run on the thread that created it.
437
///
438
/// # Examples
439
///
440
/// ```
441
/// use async_executor::LocalExecutor;
442
/// use futures_lite::future;
443
///
444
/// let local_ex = LocalExecutor::new();
445
///
446
/// future::block_on(local_ex.run(async {
447
///     println!("Hello world!");
448
/// }));
449
/// ```
450
pub struct LocalExecutor<'a> {
451
    /// The inner executor.
452
    inner: Executor<'a>,
453
454
    /// Makes the type `!Send` and `!Sync`.
455
    _marker: PhantomData<Rc<()>>,
456
}
457
458
impl UnwindSafe for LocalExecutor<'_> {}
459
impl RefUnwindSafe for LocalExecutor<'_> {}
460
461
impl fmt::Debug for LocalExecutor<'_> {
462
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463
0
        debug_executor(&self.inner, "LocalExecutor", f)
464
0
    }
465
}
466
467
impl<'a> LocalExecutor<'a> {
468
    /// Creates a single-threaded executor.
469
    ///
470
    /// # Examples
471
    ///
472
    /// ```
473
    /// use async_executor::LocalExecutor;
474
    ///
475
    /// let local_ex = LocalExecutor::new();
476
    /// ```
477
0
    pub const fn new() -> LocalExecutor<'a> {
478
0
        LocalExecutor {
479
0
            inner: Executor::new(),
480
0
            _marker: PhantomData,
481
0
        }
482
0
    }
483
484
    /// Returns `true` if there are no unfinished tasks.
485
    ///
486
    /// # Examples
487
    ///
488
    /// ```
489
    /// use async_executor::LocalExecutor;
490
    ///
491
    /// let local_ex = LocalExecutor::new();
492
    /// assert!(local_ex.is_empty());
493
    ///
494
    /// let task = local_ex.spawn(async {
495
    ///     println!("Hello world");
496
    /// });
497
    /// assert!(!local_ex.is_empty());
498
    ///
499
    /// assert!(local_ex.try_tick());
500
    /// assert!(local_ex.is_empty());
501
    /// ```
502
0
    pub fn is_empty(&self) -> bool {
503
0
        self.inner().is_empty()
504
0
    }
505
506
    /// Spawns a task onto the executor.
507
    ///
508
    /// # Examples
509
    ///
510
    /// ```
511
    /// use async_executor::LocalExecutor;
512
    ///
513
    /// let local_ex = LocalExecutor::new();
514
    ///
515
    /// let task = local_ex.spawn(async {
516
    ///     println!("Hello world");
517
    /// });
518
    /// ```
519
0
    pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
520
0
        let mut active = self.inner().state().active.lock().unwrap();
521
0
522
0
        // SAFETY: This executor is not thread safe, so the future and its result
523
0
        //         cannot be sent to another thread.
524
0
        unsafe { self.inner().spawn_inner(future, &mut active) }
525
0
    }
526
527
    /// Spawns many tasks onto the executor.
528
    ///
529
    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
530
    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
531
    /// contention.
532
    ///
533
    /// It is assumed that the iterator provided does not block; blocking iterators can lock up
534
    /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
535
    /// mutex is not released, as there are no other threads that can poll this executor.
536
    ///
537
    /// ## Example
538
    ///
539
    /// ```
540
    /// use async_executor::LocalExecutor;
541
    /// use futures_lite::{stream, prelude::*};
542
    /// use std::future::ready;
543
    ///
544
    /// # futures_lite::future::block_on(async {
545
    /// let mut ex = LocalExecutor::new();
546
    ///
547
    /// let futures = [
548
    ///     ready(1),
549
    ///     ready(2),
550
    ///     ready(3)
551
    /// ];
552
    ///
553
    /// // Spawn all of the futures onto the executor at once.
554
    /// let mut tasks = vec![];
555
    /// ex.spawn_many(futures, &mut tasks);
556
    ///
557
    /// // Await all of them.
558
    /// let results = ex.run(async move {
559
    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
560
    /// }).await;
561
    /// assert_eq!(results, [1, 2, 3]);
562
    /// # });
563
    /// ```
564
    ///
565
    /// [`spawn`]: LocalExecutor::spawn
566
    /// [`Executor::spawn_many`]: Executor::spawn_many
567
0
    pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
568
0
        &self,
569
0
        futures: impl IntoIterator<Item = F>,
570
0
        handles: &mut impl Extend<Task<F::Output>>,
571
0
    ) {
572
0
        let mut active = self.inner().state().active.lock().unwrap();
573
0
574
0
        // Convert all of the futures to tasks.
575
0
        let tasks = futures.into_iter().map(|future| {
576
0
            // SAFETY: This executor is not thread safe, so the future and its result
577
0
            //         cannot be sent to another thread.
578
0
            unsafe { self.inner().spawn_inner(future, &mut active) }
579
0
580
0
            // As only one thread can spawn or poll tasks at a time, there is no need
581
0
            // to release lock contention here.
582
0
        });
583
0
584
0
        // Push them to the user's collection.
585
0
        handles.extend(tasks);
586
0
    }
587
588
    /// Attempts to run a task if at least one is scheduled.
589
    ///
590
    /// Running a scheduled task means simply polling its future once.
591
    ///
592
    /// # Examples
593
    ///
594
    /// ```
595
    /// use async_executor::LocalExecutor;
596
    ///
597
    /// let ex = LocalExecutor::new();
598
    /// assert!(!ex.try_tick()); // no tasks to run
599
    ///
600
    /// let task = ex.spawn(async {
601
    ///     println!("Hello world");
602
    /// });
603
    /// assert!(ex.try_tick()); // a task was found
604
    /// ```
605
0
    pub fn try_tick(&self) -> bool {
606
0
        self.inner().try_tick()
607
0
    }
608
609
    /// Runs a single task.
610
    ///
611
    /// Running a task means simply polling its future once.
612
    ///
613
    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
614
    ///
615
    /// # Examples
616
    ///
617
    /// ```
618
    /// use async_executor::LocalExecutor;
619
    /// use futures_lite::future;
620
    ///
621
    /// let ex = LocalExecutor::new();
622
    ///
623
    /// let task = ex.spawn(async {
624
    ///     println!("Hello world");
625
    /// });
626
    /// future::block_on(ex.tick()); // runs the task
627
    /// ```
628
0
    pub async fn tick(&self) {
629
0
        self.inner().tick().await
630
0
    }
631
632
    /// Runs the executor until the given future completes.
633
    ///
634
    /// # Examples
635
    ///
636
    /// ```
637
    /// use async_executor::LocalExecutor;
638
    /// use futures_lite::future;
639
    ///
640
    /// let local_ex = LocalExecutor::new();
641
    ///
642
    /// let task = local_ex.spawn(async { 1 + 2 });
643
    /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
644
    ///
645
    /// assert_eq!(res, 6);
646
    /// ```
647
0
    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
648
0
        self.inner().run(future).await
649
0
    }
650
651
    /// Returns a reference to the inner executor.
652
0
    fn inner(&self) -> &Executor<'a> {
653
0
        &self.inner
654
0
    }
655
}
656
657
impl<'a> Default for LocalExecutor<'a> {
658
0
    fn default() -> LocalExecutor<'a> {
659
0
        LocalExecutor::new()
660
0
    }
661
}
662
663
/// The state of a executor.
664
struct State {
665
    /// The global queue.
666
    queue: ConcurrentQueue<Runnable>,
667
668
    /// Local queues created by runners.
669
    local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
670
671
    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
672
    notified: AtomicBool,
673
674
    /// A list of sleeping tickers.
675
    sleepers: Mutex<Sleepers>,
676
677
    /// Currently active tasks.
678
    active: Mutex<Slab<Waker>>,
679
}
680
681
impl State {
682
    /// Creates state for a new executor.
683
0
    const fn new() -> State {
684
0
        State {
685
0
            queue: ConcurrentQueue::unbounded(),
686
0
            local_queues: RwLock::new(Vec::new()),
687
0
            notified: AtomicBool::new(true),
688
0
            sleepers: Mutex::new(Sleepers {
689
0
                count: 0,
690
0
                wakers: Vec::new(),
691
0
                free_ids: Vec::new(),
692
0
            }),
693
0
            active: Mutex::new(Slab::new()),
694
0
        }
695
0
    }
696
697
    /// Notifies a sleeping ticker.
698
    #[inline]
699
0
    fn notify(&self) {
700
0
        if self
701
0
            .notified
702
0
            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
703
0
            .is_ok()
704
        {
705
0
            let waker = self.sleepers.lock().unwrap().notify();
706
0
            if let Some(w) = waker {
707
0
                w.wake();
708
0
            }
709
0
        }
710
0
    }
Unexecuted instantiation: <async_executor::State>::notify
Unexecuted instantiation: <async_executor::State>::notify
711
712
0
    pub(crate) fn try_tick(&self) -> bool {
713
0
        match self.queue.pop() {
714
0
            Err(_) => false,
715
0
            Ok(runnable) => {
716
0
                // Notify another ticker now to pick up where this ticker left off, just in case
717
0
                // running the task takes a long time.
718
0
                self.notify();
719
0
720
0
                // Run the task.
721
0
                runnable.run();
722
0
                true
723
            }
724
        }
725
0
    }
726
727
0
    pub(crate) async fn tick(&self) {
728
0
        let runnable = Ticker::new(self).runnable().await;
729
0
        runnable.run();
730
0
    }
731
732
0
    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>>
Unexecuted instantiation: <async_executor::State>::run::<_, _>
733
0
        let mut runner = Runner::new(self);
734
0
        let mut rng = fastrand::Rng::new();
735
0
736
0
        // A future that runs tasks forever.
737
0
        let run_forever = async {
738
            loop {
739
0
                for _ in 0..200 {
740
0
                    let runnable = runner.runnable(&mut rng).await;
741
0
                    runnable.run();
742
                }
743
0
                future::yield_now().await;
Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::State>::run::<_, _>::{closure#0}::{closure#0}
744
            }
745
        };
746
747
        // Run `future` and `run_forever` concurrently until `future` completes.
748
0
        future.or(run_forever).await
749
0
    }
Unexecuted instantiation: <async_executor::State>::run::<(), futures_util::future::pending::Pending<()>>::{closure#0}
Unexecuted instantiation: <async_executor::State>::run::<_, _>::{closure#0}
750
}
751
752
/// A list of sleeping tickers.
753
struct Sleepers {
754
    /// Number of sleeping tickers (both notified and unnotified).
755
    count: usize,
756
757
    /// IDs and wakers of sleeping unnotified tickers.
758
    ///
759
    /// A sleeping ticker is notified when its waker is missing from this list.
760
    wakers: Vec<(usize, Waker)>,
761
762
    /// Reclaimed IDs.
763
    free_ids: Vec<usize>,
764
}
765
766
impl Sleepers {
767
    /// Inserts a new sleeping ticker.
768
0
    fn insert(&mut self, waker: &Waker) -> usize {
769
0
        let id = match self.free_ids.pop() {
770
0
            Some(id) => id,
771
0
            None => self.count + 1,
772
        };
773
0
        self.count += 1;
774
0
        self.wakers.push((id, waker.clone()));
775
0
        id
776
0
    }
777
778
    /// Re-inserts a sleeping ticker's waker if it was notified.
779
    ///
780
    /// Returns `true` if the ticker was notified.
781
0
    fn update(&mut self, id: usize, waker: &Waker) -> bool {
782
0
        for item in &mut self.wakers {
783
0
            if item.0 == id {
784
0
                item.1.clone_from(waker);
785
0
                return false;
786
0
            }
787
        }
788
789
0
        self.wakers.push((id, waker.clone()));
790
0
        true
791
0
    }
792
793
    /// Removes a previously inserted sleeping ticker.
794
    ///
795
    /// Returns `true` if the ticker was notified.
796
0
    fn remove(&mut self, id: usize) -> bool {
797
0
        self.count -= 1;
798
0
        self.free_ids.push(id);
799
800
0
        for i in (0..self.wakers.len()).rev() {
801
0
            if self.wakers[i].0 == id {
802
0
                self.wakers.remove(i);
803
0
                return false;
804
0
            }
805
        }
806
0
        true
807
0
    }
808
809
    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
810
0
    fn is_notified(&self) -> bool {
811
0
        self.count == 0 || self.count > self.wakers.len()
812
0
    }
813
814
    /// Returns notification waker for a sleeping ticker.
815
    ///
816
    /// If a ticker was notified already or there are no tickers, `None` will be returned.
817
0
    fn notify(&mut self) -> Option<Waker> {
818
0
        if self.wakers.len() == self.count {
819
0
            self.wakers.pop().map(|item| item.1)
820
        } else {
821
0
            None
822
        }
823
0
    }
824
}
825
826
/// Runs task one by one.
827
struct Ticker<'a> {
828
    /// The executor state.
829
    state: &'a State,
830
831
    /// Set to a non-zero sleeper ID when in sleeping state.
832
    ///
833
    /// States a ticker can be in:
834
    /// 1) Woken.
835
    ///    2a) Sleeping and unnotified.
836
    ///    2b) Sleeping and notified.
837
    sleeping: usize,
838
}
839
840
impl Ticker<'_> {
841
    /// Creates a ticker.
842
0
    fn new(state: &State) -> Ticker<'_> {
843
0
        Ticker { state, sleeping: 0 }
844
0
    }
845
846
    /// Moves the ticker into sleeping and unnotified state.
847
    ///
848
    /// Returns `false` if the ticker was already sleeping and unnotified.
849
0
    fn sleep(&mut self, waker: &Waker) -> bool {
850
0
        let mut sleepers = self.state.sleepers.lock().unwrap();
851
0
852
0
        match self.sleeping {
853
            // Move to sleeping state.
854
0
            0 => {
855
0
                self.sleeping = sleepers.insert(waker);
856
0
            }
857
858
            // Already sleeping, check if notified.
859
0
            id => {
860
0
                if !sleepers.update(id, waker) {
861
0
                    return false;
862
0
                }
863
            }
864
        }
865
866
0
        self.state
867
0
            .notified
868
0
            .store(sleepers.is_notified(), Ordering::Release);
869
0
870
0
        true
871
0
    }
872
873
    /// Moves the ticker into woken state.
874
0
    fn wake(&mut self) {
875
0
        if self.sleeping != 0 {
876
0
            let mut sleepers = self.state.sleepers.lock().unwrap();
877
0
            sleepers.remove(self.sleeping);
878
0
879
0
            self.state
880
0
                .notified
881
0
                .store(sleepers.is_notified(), Ordering::Release);
882
0
        }
883
0
        self.sleeping = 0;
884
0
    }
885
886
    /// Waits for the next runnable task to run.
887
0
    async fn runnable(&mut self) -> Runnable {
888
0
        self.runnable_with(|| self.state.queue.pop().ok()).await
889
0
    }
890
891
    /// Waits for the next runnable task to run, given a function that searches for a task.
892
0
    async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}>
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_>
893
0
        future::poll_fn(|cx| {
894
            loop {
895
0
                match search() {
896
                    None => {
897
                        // Move to sleeping and unnotified state.
898
0
                        if !self.sleep(cx.waker()) {
899
                            // If already sleeping and unnotified, return.
900
0
                            return Poll::Pending;
901
0
                        }
902
                    }
903
0
                    Some(r) => {
904
0
                        // Wake up.
905
0
                        self.wake();
906
0
907
0
                        // Notify another ticker now to pick up where this ticker left off, just in
908
0
                        // case running the task takes a long time.
909
0
                        self.state.notify();
910
0
911
0
                        return Poll::Ready(r);
912
                    }
913
                }
914
            }
915
0
        })
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_>::{closure#0}::{closure#0}
916
0
        .await
917
0
    }
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<<async_executor::Runner>::runnable::{closure#0}::{closure#0}>::{closure#0}
Unexecuted instantiation: <async_executor::Ticker>::runnable_with::<_>::{closure#0}
918
}
919
920
impl Drop for Ticker<'_> {
921
0
    fn drop(&mut self) {
922
0
        // If this ticker is in sleeping state, it must be removed from the sleepers list.
923
0
        if self.sleeping != 0 {
924
0
            let mut sleepers = self.state.sleepers.lock().unwrap();
925
0
            let notified = sleepers.remove(self.sleeping);
926
0
927
0
            self.state
928
0
                .notified
929
0
                .store(sleepers.is_notified(), Ordering::Release);
930
0
931
0
            // If this ticker was notified, then notify another ticker.
932
0
            if notified {
933
0
                drop(sleepers);
934
0
                self.state.notify();
935
0
            }
936
0
        }
937
0
    }
938
}
939
940
/// A worker in a work-stealing executor.
941
///
942
/// This is just a ticker that also has an associated local queue for improved cache locality.
943
struct Runner<'a> {
944
    /// The executor state.
945
    state: &'a State,
946
947
    /// Inner ticker.
948
    ticker: Ticker<'a>,
949
950
    /// The local queue.
951
    local: Arc<ConcurrentQueue<Runnable>>,
952
953
    /// Bumped every time a runnable task is found.
954
    ticks: usize,
955
}
956
957
impl Runner<'_> {
958
    /// Creates a runner and registers it in the executor state.
959
0
    fn new(state: &State) -> Runner<'_> {
960
0
        let runner = Runner {
961
0
            state,
962
0
            ticker: Ticker::new(state),
963
0
            local: Arc::new(ConcurrentQueue::bounded(512)),
964
0
            ticks: 0,
965
0
        };
966
0
        state
967
0
            .local_queues
968
0
            .write()
969
0
            .unwrap()
970
0
            .push(runner.local.clone());
971
0
        runner
972
0
    }
973
974
    /// Waits for the next runnable task to run.
975
0
    async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
976
0
        let runnable = self
977
0
            .ticker
978
0
            .runnable_with(|| {
979
                // Try the local queue.
980
0
                if let Ok(r) = self.local.pop() {
981
0
                    return Some(r);
982
0
                }
983
984
                // Try stealing from the global queue.
985
0
                if let Ok(r) = self.state.queue.pop() {
986
0
                    steal(&self.state.queue, &self.local);
987
0
                    return Some(r);
988
0
                }
989
0
990
0
                // Try stealing from other runners.
991
0
                let local_queues = self.state.local_queues.read().unwrap();
992
0
993
0
                // Pick a random starting point in the iterator list and rotate the list.
994
0
                let n = local_queues.len();
995
0
                let start = rng.usize(..n);
996
0
                let iter = local_queues
997
0
                    .iter()
998
0
                    .chain(local_queues.iter())
999
0
                    .skip(start)
1000
0
                    .take(n);
1001
0
1002
0
                // Remove this runner's local queue.
1003
0
                let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}::{closure#0}
1004
1005
                // Try stealing from each local queue in the list.
1006
0
                for local in iter {
1007
0
                    steal(local, &self.local);
1008
0
                    if let Ok(r) = self.local.pop() {
1009
0
                        return Some(r);
1010
0
                    }
1011
                }
1012
1013
0
                None
1014
0
            })
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}::{closure#0}
1015
0
            .await;
1016
1017
        // Bump the tick counter.
1018
0
        self.ticks = self.ticks.wrapping_add(1);
1019
0
1020
0
        if self.ticks % 64 == 0 {
1021
0
            // Steal tasks from the global queue to ensure fair task scheduling.
1022
0
            steal(&self.state.queue, &self.local);
1023
0
        }
1024
1025
0
        runnable
1026
0
    }
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}
Unexecuted instantiation: <async_executor::Runner>::runnable::{closure#0}
1027
}
1028
1029
impl Drop for Runner<'_> {
1030
0
    fn drop(&mut self) {
1031
0
        // Remove the local queue.
1032
0
        self.state
1033
0
            .local_queues
1034
0
            .write()
1035
0
            .unwrap()
1036
0
            .retain(|local| !Arc::ptr_eq(local, &self.local));
1037
1038
        // Re-schedule remaining tasks in the local queue.
1039
0
        while let Ok(r) = self.local.pop() {
1040
0
            r.schedule();
1041
0
        }
1042
0
    }
1043
}
1044
1045
/// Steals some items from one queue into another.
1046
0
fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1047
0
    // Half of `src`'s length rounded up.
1048
0
    let mut count = (src.len() + 1) / 2;
1049
0
1050
0
    if count > 0 {
1051
        // Don't steal more than fits into the queue.
1052
0
        if let Some(cap) = dest.capacity() {
1053
0
            count = count.min(cap - dest.len());
1054
0
        }
1055
1056
        // Steal tasks.
1057
0
        for _ in 0..count {
1058
0
            if let Ok(t) = src.pop() {
1059
0
                assert!(dest.push(t).is_ok());
1060
            } else {
1061
0
                break;
1062
            }
1063
        }
1064
0
    }
1065
0
}
Unexecuted instantiation: async_executor::steal::<async_task::runnable::Runnable>
Unexecuted instantiation: async_executor::steal::<_>
1066
1067
/// Debug implementation for `Executor` and `LocalExecutor`.
1068
0
fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1069
0
    // Get a reference to the state.
1070
0
    let ptr = executor.state.load(Ordering::Acquire);
1071
0
    if ptr.is_null() {
1072
        // The executor has not been initialized.
1073
        struct Uninitialized;
1074
1075
        impl fmt::Debug for Uninitialized {
1076
0
            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077
0
                f.write_str("<uninitialized>")
1078
0
            }
1079
        }
1080
1081
0
        return f.debug_tuple(name).field(&Uninitialized).finish();
1082
0
    }
1083
0
1084
0
    // SAFETY: If the state pointer is not null, it must have been
1085
0
    // allocated properly by Arc::new and converted via Arc::into_raw
1086
0
    // in state_ptr.
1087
0
    let state = unsafe { &*ptr };
1088
0
1089
0
    debug_state(state, name, f)
1090
0
}
1091
1092
/// Debug implementation for `Executor` and `LocalExecutor`.
1093
0
fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094
    /// Debug wrapper for the number of active tasks.
1095
    struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1096
1097
    impl fmt::Debug for ActiveTasks<'_> {
1098
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1099
0
            match self.0.try_lock() {
1100
0
                Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1101
0
                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1102
0
                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1103
            }
1104
0
        }
1105
    }
1106
1107
    /// Debug wrapper for the local runners.
1108
    struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1109
1110
    impl fmt::Debug for LocalRunners<'_> {
1111
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1112
0
            match self.0.try_read() {
1113
0
                Ok(lock) => f
1114
0
                    .debug_list()
1115
0
                    .entries(lock.iter().map(|queue| queue.len()))
1116
0
                    .finish(),
1117
0
                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1118
0
                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1119
            }
1120
0
        }
1121
    }
1122
1123
    /// Debug wrapper for the sleepers.
1124
    struct SleepCount<'a>(&'a Mutex<Sleepers>);
1125
1126
    impl fmt::Debug for SleepCount<'_> {
1127
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1128
0
            match self.0.try_lock() {
1129
0
                Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1130
0
                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1131
0
                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1132
            }
1133
0
        }
1134
    }
1135
1136
0
    f.debug_struct(name)
1137
0
        .field("active", &ActiveTasks(&state.active))
1138
0
        .field("global_tasks", &state.queue.len())
1139
0
        .field("local_runners", &LocalRunners(&state.local_queues))
1140
0
        .field("sleepers", &SleepCount(&state.sleepers))
1141
0
        .finish()
1142
0
}
1143
1144
/// Runs a closure when dropped.
1145
struct CallOnDrop<F: FnMut()>(F);
1146
1147
impl<F: FnMut()> Drop for CallOnDrop<F> {
1148
0
    fn drop(&mut self) {
1149
0
        (self.0)();
1150
0
    }
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#14}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#15}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#16}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#17}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#18}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#19}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#20}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<<async_executor::Executor>::spawn_inner<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::cpu_intensive<core::result::Result<surrealdb_core::sql::value::value::Value, surrealdb_core::err::Error>, surrealdb_core::fnc::asynchronous::{closure#0}::{closure#21}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_executor::CallOnDrop<_> as core::ops::drop::Drop>::drop
1151
}
1152
1153
0
fn _ensure_send_and_sync() {
1154
    use futures_lite::future::pending;
1155
1156
0
    fn is_send<T: Send>(_: T) {}
1157
0
    fn is_sync<T: Sync>(_: T) {}
1158
0
    fn is_static<T: 'static>(_: T) {}
1159
1160
0
    is_send::<Executor<'_>>(Executor::new());
1161
0
    is_sync::<Executor<'_>>(Executor::new());
1162
0
1163
0
    let ex = Executor::new();
1164
0
    is_send(ex.run(pending::<()>()));
1165
0
    is_sync(ex.run(pending::<()>()));
1166
0
    is_send(ex.tick());
1167
0
    is_sync(ex.tick());
1168
0
    is_send(ex.schedule());
1169
0
    is_sync(ex.schedule());
1170
0
    is_static(ex.schedule());
1171
1172
    /// ```compile_fail
1173
    /// use async_executor::LocalExecutor;
1174
    /// use futures_lite::future::pending;
1175
    ///
1176
    /// fn is_send<T: Send>(_: T) {}
1177
    /// fn is_sync<T: Sync>(_: T) {}
1178
    ///
1179
    /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1180
    /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1181
    ///
1182
    /// let ex = LocalExecutor::new();
1183
    /// is_send(ex.run(pending::<()>()));
1184
    /// is_sync(ex.run(pending::<()>()));
1185
    /// is_send(ex.tick());
1186
    /// is_sync(ex.tick());
1187
    /// ```
1188
0
    fn _negative_test() {}
1189
0
}