/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.47.1/src/sync/notify.rs
Line | Count | Source |
1 | | // Allow `unreachable_pub` warnings when sync is not enabled |
2 | | // due to the usage of `Notify` within the `rt` feature set. |
3 | | // When this module is compiled with `sync` enabled we will warn on |
4 | | // this lint. When `rt` is enabled we use `pub(crate)` which |
5 | | // triggers this warning but it is safe to ignore in this case. |
6 | | #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] |
7 | | |
8 | | use crate::loom::cell::UnsafeCell; |
9 | | use crate::loom::sync::atomic::AtomicUsize; |
10 | | use crate::loom::sync::Mutex; |
11 | | use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; |
12 | | use crate::util::WakeList; |
13 | | |
14 | | use std::future::Future; |
15 | | use std::marker::PhantomPinned; |
16 | | use std::panic::{RefUnwindSafe, UnwindSafe}; |
17 | | use std::pin::Pin; |
18 | | use std::ptr::NonNull; |
19 | | use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst}; |
20 | | use std::sync::Arc; |
21 | | use std::task::{Context, Poll, Waker}; |
22 | | |
23 | | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
24 | | type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
25 | | |
26 | | /// Notifies a single task to wake up. |
27 | | /// |
28 | | /// `Notify` provides a basic mechanism to notify a single task of an event. |
29 | | /// `Notify` itself does not carry any data. Instead, it is to be used to signal |
30 | | /// another task to perform an operation. |
31 | | /// |
32 | | /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The |
33 | | /// [`notified().await`] method waits for a permit to become available, and |
34 | | /// [`notify_one()`] sets a permit **if there currently are no available |
35 | | /// permits**. |
36 | | /// |
37 | | /// The synchronization details of `Notify` are similar to |
38 | | /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] |
39 | | /// value contains a single permit. [`notified().await`] waits for the permit to |
40 | | /// be made available, consumes the permit, and resumes. [`notify_one()`] sets |
41 | | /// the permit, waking a pending task if there is one. |
42 | | /// |
43 | | /// If `notify_one()` is called **before** `notified().await`, then the next |
44 | | /// call to `notified().await` will complete immediately, consuming the permit. |
45 | | /// Any subsequent calls to `notified().await` will wait for a new permit. |
46 | | /// |
47 | | /// If `notify_one()` is called **multiple** times before `notified().await`, |
48 | | /// only a **single** permit is stored. The next call to `notified().await` will |
49 | | /// complete immediately, but the one after will wait for a new permit. |
50 | | /// |
51 | | /// # Examples |
52 | | /// |
53 | | /// Basic usage. |
54 | | /// |
55 | | /// ``` |
56 | | /// use tokio::sync::Notify; |
57 | | /// use std::sync::Arc; |
58 | | /// |
59 | | /// #[tokio::main] |
60 | | /// async fn main() { |
61 | | /// let notify = Arc::new(Notify::new()); |
62 | | /// let notify2 = notify.clone(); |
63 | | /// |
64 | | /// let handle = tokio::spawn(async move { |
65 | | /// notify2.notified().await; |
66 | | /// println!("received notification"); |
67 | | /// }); |
68 | | /// |
69 | | /// println!("sending notification"); |
70 | | /// notify.notify_one(); |
71 | | /// |
72 | | /// // Wait for task to receive notification. |
73 | | /// handle.await.unwrap(); |
74 | | /// } |
75 | | /// ``` |
76 | | /// |
77 | | /// Unbound multi-producer single-consumer (mpsc) channel. |
78 | | /// |
79 | | /// No wakeups can be lost when using this channel because the call to |
80 | | /// `notify_one()` will store a permit in the `Notify`, which the following call |
81 | | /// to `notified()` will consume. |
82 | | /// |
83 | | /// ``` |
84 | | /// use tokio::sync::Notify; |
85 | | /// |
86 | | /// use std::collections::VecDeque; |
87 | | /// use std::sync::Mutex; |
88 | | /// |
89 | | /// struct Channel<T> { |
90 | | /// values: Mutex<VecDeque<T>>, |
91 | | /// notify: Notify, |
92 | | /// } |
93 | | /// |
94 | | /// impl<T> Channel<T> { |
95 | | /// pub fn send(&self, value: T) { |
96 | | /// self.values.lock().unwrap() |
97 | | /// .push_back(value); |
98 | | /// |
99 | | /// // Notify the consumer a value is available |
100 | | /// self.notify.notify_one(); |
101 | | /// } |
102 | | /// |
103 | | /// // This is a single-consumer channel, so several concurrent calls to |
104 | | /// // `recv` are not allowed. |
105 | | /// pub async fn recv(&self) -> T { |
106 | | /// loop { |
107 | | /// // Drain values |
108 | | /// if let Some(value) = self.values.lock().unwrap().pop_front() { |
109 | | /// return value; |
110 | | /// } |
111 | | /// |
112 | | /// // Wait for values to be available |
113 | | /// self.notify.notified().await; |
114 | | /// } |
115 | | /// } |
116 | | /// } |
117 | | /// ``` |
118 | | /// |
119 | | /// Unbound multi-producer multi-consumer (mpmc) channel. |
120 | | /// |
121 | | /// The call to [`enable`] is important because otherwise if you have two |
122 | | /// calls to `recv` and two calls to `send` in parallel, the following could |
123 | | /// happen: |
124 | | /// |
125 | | /// 1. Both calls to `try_recv` return `None`. |
126 | | /// 2. Both new elements are added to the vector. |
127 | | /// 3. The `notify_one` method is called twice, adding only a single |
128 | | /// permit to the `Notify`. |
129 | | /// 4. Both calls to `recv` reach the `Notified` future. One of them |
130 | | /// consumes the permit, and the other sleeps forever. |
131 | | /// |
132 | | /// By adding the `Notified` futures to the list by calling `enable` before |
133 | | /// `try_recv`, the `notify_one` calls in step three would remove the |
134 | | /// futures from the list and mark them notified instead of adding a permit |
135 | | /// to the `Notify`. This ensures that both futures are woken. |
136 | | /// |
137 | | /// Notice that this failure can only happen if there are two concurrent calls |
138 | | /// to `recv`. This is why the mpsc example above does not require a call to |
139 | | /// `enable`. |
140 | | /// |
141 | | /// ``` |
142 | | /// use tokio::sync::Notify; |
143 | | /// |
144 | | /// use std::collections::VecDeque; |
145 | | /// use std::sync::Mutex; |
146 | | /// |
147 | | /// struct Channel<T> { |
148 | | /// messages: Mutex<VecDeque<T>>, |
149 | | /// notify_on_sent: Notify, |
150 | | /// } |
151 | | /// |
152 | | /// impl<T> Channel<T> { |
153 | | /// pub fn send(&self, msg: T) { |
154 | | /// let mut locked_queue = self.messages.lock().unwrap(); |
155 | | /// locked_queue.push_back(msg); |
156 | | /// drop(locked_queue); |
157 | | /// |
158 | | /// // Send a notification to one of the calls currently |
159 | | /// // waiting in a call to `recv`. |
160 | | /// self.notify_on_sent.notify_one(); |
161 | | /// } |
162 | | /// |
163 | | /// pub fn try_recv(&self) -> Option<T> { |
164 | | /// let mut locked_queue = self.messages.lock().unwrap(); |
165 | | /// locked_queue.pop_front() |
166 | | /// } |
167 | | /// |
168 | | /// pub async fn recv(&self) -> T { |
169 | | /// let future = self.notify_on_sent.notified(); |
170 | | /// tokio::pin!(future); |
171 | | /// |
172 | | /// loop { |
173 | | /// // Make sure that no wakeup is lost if we get |
174 | | /// // `None` from `try_recv`. |
175 | | /// future.as_mut().enable(); |
176 | | /// |
177 | | /// if let Some(msg) = self.try_recv() { |
178 | | /// return msg; |
179 | | /// } |
180 | | /// |
181 | | /// // Wait for a call to `notify_one`. |
182 | | /// // |
183 | | /// // This uses `.as_mut()` to avoid consuming the future, |
184 | | /// // which lets us call `Pin::set` below. |
185 | | /// future.as_mut().await; |
186 | | /// |
187 | | /// // Reset the future in case another call to |
188 | | /// // `try_recv` got the message before us. |
189 | | /// future.set(self.notify_on_sent.notified()); |
190 | | /// } |
191 | | /// } |
192 | | /// } |
193 | | /// ``` |
194 | | /// |
195 | | /// [park]: std::thread::park |
196 | | /// [unpark]: std::thread::Thread::unpark |
197 | | /// [`notified().await`]: Notify::notified() |
198 | | /// [`notify_one()`]: Notify::notify_one() |
199 | | /// [`enable`]: Notified::enable() |
200 | | /// [`Semaphore`]: crate::sync::Semaphore |
201 | | #[derive(Debug)] |
202 | | pub struct Notify { |
203 | | // `state` uses 2 bits to store one of `EMPTY`, |
204 | | // `WAITING` or `NOTIFIED`. The rest of the bits |
205 | | // are used to store the number of times `notify_waiters` |
206 | | // was called. |
207 | | // |
208 | | // Throughout the code there are two assumptions: |
209 | | // - state can be transitioned *from* `WAITING` only if |
210 | | // `waiters` lock is held |
211 | | // - number of times `notify_waiters` was called can |
212 | | // be modified only if `waiters` lock is held |
213 | | state: AtomicUsize, |
214 | | waiters: Mutex<WaitList>, |
215 | | } |
216 | | |
217 | | #[derive(Debug)] |
218 | | struct Waiter { |
219 | | /// Intrusive linked-list pointers. |
220 | | pointers: linked_list::Pointers<Waiter>, |
221 | | |
222 | | /// Waiting task's waker. Depending on the value of `notification`, |
223 | | /// this field is either protected by the `waiters` lock in |
224 | | /// `Notify`, or it is exclusively owned by the enclosing `Waiter`. |
225 | | waker: UnsafeCell<Option<Waker>>, |
226 | | |
227 | | /// Notification for this waiter. Uses 2 bits to store if and how was |
228 | | /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and |
229 | | /// the rest of it is unused. |
230 | | /// * if it's `None`, then `waker` is protected by the `waiters` lock. |
231 | | /// * if it's `Some`, then `waker` is exclusively owned by the |
232 | | /// enclosing `Waiter` and can be accessed without locking. |
233 | | notification: AtomicNotification, |
234 | | |
235 | | /// Should not be `Unpin`. |
236 | | _p: PhantomPinned, |
237 | | } |
238 | | |
239 | | impl Waiter { |
240 | 0 | fn new() -> Waiter { |
241 | 0 | Waiter { |
242 | 0 | pointers: linked_list::Pointers::new(), |
243 | 0 | waker: UnsafeCell::new(None), |
244 | 0 | notification: AtomicNotification::none(), |
245 | 0 | _p: PhantomPinned, |
246 | 0 | } |
247 | 0 | } |
248 | | } |
249 | | |
250 | | generate_addr_of_methods! { |
251 | | impl<> Waiter { |
252 | | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
253 | | &self.pointers |
254 | | } |
255 | | } |
256 | | } |
257 | | |
258 | | // No notification. |
259 | | const NOTIFICATION_NONE: usize = 0b000; |
260 | | |
261 | | // Notification type used by `notify_one`. |
262 | | const NOTIFICATION_ONE: usize = 0b001; |
263 | | |
264 | | // Notification type used by `notify_last`. |
265 | | const NOTIFICATION_LAST: usize = 0b101; |
266 | | |
267 | | // Notification type used by `notify_waiters`. |
268 | | const NOTIFICATION_ALL: usize = 0b010; |
269 | | |
270 | | /// Notification for a `Waiter`. |
271 | | /// This struct is equivalent to `Option<Notification>`, but uses |
272 | | /// `AtomicUsize` inside for atomic operations. |
273 | | #[derive(Debug)] |
274 | | struct AtomicNotification(AtomicUsize); |
275 | | |
276 | | impl AtomicNotification { |
277 | 0 | fn none() -> Self { |
278 | 0 | AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE)) |
279 | 0 | } |
280 | | |
281 | | /// Store-release a notification. |
282 | | /// This method should be called exactly once. |
283 | 0 | fn store_release(&self, notification: Notification) { |
284 | 0 | let data: usize = match notification { |
285 | 0 | Notification::All => NOTIFICATION_ALL, |
286 | 0 | Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE, |
287 | 0 | Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST, |
288 | | }; |
289 | 0 | self.0.store(data, Release); |
290 | 0 | } |
291 | | |
292 | 0 | fn load(&self, ordering: Ordering) -> Option<Notification> { |
293 | 0 | let data = self.0.load(ordering); |
294 | 0 | match data { |
295 | 0 | NOTIFICATION_NONE => None, |
296 | 0 | NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)), |
297 | 0 | NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)), |
298 | 0 | NOTIFICATION_ALL => Some(Notification::All), |
299 | 0 | _ => unreachable!(), |
300 | | } |
301 | 0 | } |
302 | | |
303 | | /// Clears the notification. |
304 | | /// This method is used by a `Notified` future to consume the |
305 | | /// notification. It uses relaxed ordering and should be only |
306 | | /// used once the atomic notification is no longer shared. |
307 | 0 | fn clear(&self) { |
308 | 0 | self.0.store(NOTIFICATION_NONE, Relaxed); |
309 | 0 | } |
310 | | } |
311 | | |
312 | | #[derive(Debug, PartialEq, Eq)] |
313 | | #[repr(usize)] |
314 | | enum NotifyOneStrategy { |
315 | | Fifo, |
316 | | Lifo, |
317 | | } |
318 | | |
319 | | #[derive(Debug, PartialEq, Eq)] |
320 | | #[repr(usize)] |
321 | | enum Notification { |
322 | | One(NotifyOneStrategy), |
323 | | All, |
324 | | } |
325 | | |
326 | | /// List used in `Notify::notify_waiters`. It wraps a guarded linked list |
327 | | /// and gates the access to it on `notify.waiters` mutex. It also empties |
328 | | /// the list on drop. |
329 | | struct NotifyWaitersList<'a> { |
330 | | list: GuardedWaitList, |
331 | | is_empty: bool, |
332 | | notify: &'a Notify, |
333 | | } |
334 | | |
335 | | impl<'a> NotifyWaitersList<'a> { |
336 | 0 | fn new( |
337 | 0 | unguarded_list: WaitList, |
338 | 0 | guard: Pin<&'a Waiter>, |
339 | 0 | notify: &'a Notify, |
340 | 0 | ) -> NotifyWaitersList<'a> { |
341 | 0 | let guard_ptr = NonNull::from(guard.get_ref()); |
342 | 0 | let list = unguarded_list.into_guarded(guard_ptr); |
343 | 0 | NotifyWaitersList { |
344 | 0 | list, |
345 | 0 | is_empty: false, |
346 | 0 | notify, |
347 | 0 | } |
348 | 0 | } |
349 | | |
350 | | /// Removes the last element from the guarded list. Modifying this list |
351 | | /// requires an exclusive access to the main list in `Notify`. |
352 | 0 | fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> { |
353 | 0 | let result = self.list.pop_back(); |
354 | 0 | if result.is_none() { |
355 | 0 | // Save information about emptiness to avoid waiting for lock |
356 | 0 | // in the destructor. |
357 | 0 | self.is_empty = true; |
358 | 0 | } |
359 | 0 | result |
360 | 0 | } |
361 | | } |
362 | | |
363 | | impl Drop for NotifyWaitersList<'_> { |
364 | 0 | fn drop(&mut self) { |
365 | | // If the list is not empty, we unlink all waiters from it. |
366 | | // We do not wake the waiters to avoid double panics. |
367 | 0 | if !self.is_empty { |
368 | 0 | let _lock_guard = self.notify.waiters.lock(); |
369 | 0 | while let Some(waiter) = self.list.pop_back() { |
370 | 0 | // Safety: we never make mutable references to waiters. |
371 | 0 | let waiter = unsafe { waiter.as_ref() }; |
372 | 0 | waiter.notification.store_release(Notification::All); |
373 | 0 | } |
374 | 0 | } |
375 | 0 | } |
376 | | } |
377 | | |
378 | | /// Future returned from [`Notify::notified()`]. |
379 | | /// |
380 | | /// This future is fused, so once it has completed, any future calls to poll |
381 | | /// will immediately return `Poll::Ready`. |
382 | | #[derive(Debug)] |
383 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
384 | | pub struct Notified<'a> { |
385 | | /// The `Notify` being received on. |
386 | | notify: &'a Notify, |
387 | | |
388 | | /// The current state of the receiving process. |
389 | | state: State, |
390 | | |
391 | | /// Number of calls to `notify_waiters` at the time of creation. |
392 | | notify_waiters_calls: usize, |
393 | | |
394 | | /// Entry in the waiter `LinkedList`. |
395 | | waiter: Waiter, |
396 | | } |
397 | | |
398 | | unsafe impl<'a> Send for Notified<'a> {} |
399 | | unsafe impl<'a> Sync for Notified<'a> {} |
400 | | |
401 | | /// Future returned from [`Notify::notified_owned()`]. |
402 | | /// |
403 | | /// This future is fused, so once it has completed, any future calls to poll |
404 | | /// will immediately return `Poll::Ready`. |
405 | | #[derive(Debug)] |
406 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
407 | | pub struct OwnedNotified { |
408 | | /// The `Notify` being received on. |
409 | | notify: Arc<Notify>, |
410 | | |
411 | | /// The current state of the receiving process. |
412 | | state: State, |
413 | | |
414 | | /// Number of calls to `notify_waiters` at the time of creation. |
415 | | notify_waiters_calls: usize, |
416 | | |
417 | | /// Entry in the waiter `LinkedList`. |
418 | | waiter: Waiter, |
419 | | } |
420 | | |
421 | | unsafe impl Sync for OwnedNotified {} |
422 | | |
423 | | /// A custom `project` implementation is used in place of `pin-project-lite` |
424 | | /// as a custom drop for [`Notified`] and [`OwnedNotified`] implementation |
425 | | /// is needed. |
426 | | struct NotifiedProject<'a> { |
427 | | notify: &'a Notify, |
428 | | state: &'a mut State, |
429 | | notify_waiters_calls: &'a usize, |
430 | | waiter: &'a Waiter, |
431 | | } |
432 | | |
433 | | #[derive(Debug)] |
434 | | enum State { |
435 | | Init, |
436 | | Waiting, |
437 | | Done, |
438 | | } |
439 | | |
440 | | const NOTIFY_WAITERS_SHIFT: usize = 2; |
441 | | const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1; |
442 | | const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK; |
443 | | |
444 | | /// Initial "idle" state. |
445 | | const EMPTY: usize = 0; |
446 | | |
447 | | /// One or more threads are currently waiting to be notified. |
448 | | const WAITING: usize = 1; |
449 | | |
450 | | /// Pending notification. |
451 | | const NOTIFIED: usize = 2; |
452 | | |
453 | 0 | fn set_state(data: usize, state: usize) -> usize { |
454 | 0 | (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK) |
455 | 0 | } |
456 | | |
457 | 0 | fn get_state(data: usize) -> usize { |
458 | 0 | data & STATE_MASK |
459 | 0 | } |
460 | | |
461 | 0 | fn get_num_notify_waiters_calls(data: usize) -> usize { |
462 | 0 | (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT |
463 | 0 | } |
464 | | |
465 | 0 | fn inc_num_notify_waiters_calls(data: usize) -> usize { |
466 | 0 | data + (1 << NOTIFY_WAITERS_SHIFT) |
467 | 0 | } |
468 | | |
469 | 0 | fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) { |
470 | 0 | data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst); |
471 | 0 | } |
472 | | |
473 | | impl Notify { |
474 | | /// Create a new `Notify`, initialized without a permit. |
475 | | /// |
476 | | /// # Examples |
477 | | /// |
478 | | /// ``` |
479 | | /// use tokio::sync::Notify; |
480 | | /// |
481 | | /// let notify = Notify::new(); |
482 | | /// ``` |
483 | 0 | pub fn new() -> Notify { |
484 | 0 | Notify { |
485 | 0 | state: AtomicUsize::new(0), |
486 | 0 | waiters: Mutex::new(LinkedList::new()), |
487 | 0 | } |
488 | 0 | } |
489 | | |
490 | | /// Create a new `Notify`, initialized without a permit. |
491 | | /// |
492 | | /// When using the `tracing` [unstable feature], a `Notify` created with |
493 | | /// `const_new` will not be instrumented. As such, it will not be visible |
494 | | /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create |
495 | | /// an instrumented object if that is needed. |
496 | | /// |
497 | | /// # Examples |
498 | | /// |
499 | | /// ``` |
500 | | /// use tokio::sync::Notify; |
501 | | /// |
502 | | /// static NOTIFY: Notify = Notify::const_new(); |
503 | | /// ``` |
504 | | /// |
505 | | /// [`tokio-console`]: https://github.com/tokio-rs/console |
506 | | /// [unstable feature]: crate#unstable-features |
507 | | #[cfg(not(all(loom, test)))] |
508 | 0 | pub const fn const_new() -> Notify { |
509 | 0 | Notify { |
510 | 0 | state: AtomicUsize::new(0), |
511 | 0 | waiters: Mutex::const_new(LinkedList::new()), |
512 | 0 | } |
513 | 0 | } |
514 | | |
515 | | /// Wait for a notification. |
516 | | /// |
517 | | /// Equivalent to: |
518 | | /// |
519 | | /// ```ignore |
520 | | /// async fn notified(&self); |
521 | | /// ``` |
522 | | /// |
523 | | /// Each `Notify` value holds a single permit. If a permit is available from |
524 | | /// an earlier call to [`notify_one()`], then `notified().await` will complete |
525 | | /// immediately, consuming that permit. Otherwise, `notified().await` waits |
526 | | /// for a permit to be made available by the next call to `notify_one()`. |
527 | | /// |
528 | | /// The `Notified` future is not guaranteed to receive wakeups from calls to |
529 | | /// `notify_one()` if it has not yet been polled. See the documentation for |
530 | | /// [`Notified::enable()`] for more details. |
531 | | /// |
532 | | /// The `Notified` future is guaranteed to receive wakeups from |
533 | | /// `notify_waiters()` as soon as it has been created, even if it has not |
534 | | /// yet been polled. |
535 | | /// |
536 | | /// [`notify_one()`]: Notify::notify_one |
537 | | /// [`Notified::enable()`]: Notified::enable |
538 | | /// |
539 | | /// # Cancel safety |
540 | | /// |
541 | | /// This method uses a queue to fairly distribute notifications in the order |
542 | | /// they were requested. Cancelling a call to `notified` makes you lose your |
543 | | /// place in the queue. |
544 | | /// |
545 | | /// # Examples |
546 | | /// |
547 | | /// ``` |
548 | | /// use tokio::sync::Notify; |
549 | | /// use std::sync::Arc; |
550 | | /// |
551 | | /// #[tokio::main] |
552 | | /// async fn main() { |
553 | | /// let notify = Arc::new(Notify::new()); |
554 | | /// let notify2 = notify.clone(); |
555 | | /// |
556 | | /// tokio::spawn(async move { |
557 | | /// notify2.notified().await; |
558 | | /// println!("received notification"); |
559 | | /// }); |
560 | | /// |
561 | | /// println!("sending notification"); |
562 | | /// notify.notify_one(); |
563 | | /// } |
564 | | /// ``` |
565 | 0 | pub fn notified(&self) -> Notified<'_> { |
566 | | // we load the number of times notify_waiters |
567 | | // was called and store that in the future. |
568 | 0 | let state = self.state.load(SeqCst); |
569 | 0 | Notified { |
570 | 0 | notify: self, |
571 | 0 | state: State::Init, |
572 | 0 | notify_waiters_calls: get_num_notify_waiters_calls(state), |
573 | 0 | waiter: Waiter::new(), |
574 | 0 | } |
575 | 0 | } |
576 | | |
577 | | /// Wait for a notification with an owned `Future`. |
578 | | /// |
579 | | /// Unlike [`Self::notified`] which returns a future tied to the `Notify`'s |
580 | | /// lifetime, `notified_owned` creates a self-contained future that owns its |
581 | | /// notification state, making it safe to move between threads. |
582 | | /// |
583 | | /// See [`Self::notified`] for more details. |
584 | | /// |
585 | | /// # Cancel safety |
586 | | /// |
587 | | /// This method uses a queue to fairly distribute notifications in the order |
588 | | /// they were requested. Cancelling a call to `notified_owned` makes you lose your |
589 | | /// place in the queue. |
590 | | /// |
591 | | /// # Examples |
592 | | /// |
593 | | /// ``` |
594 | | /// use std::sync::Arc; |
595 | | /// use tokio::sync::Notify; |
596 | | /// |
597 | | /// #[tokio::main] |
598 | | /// async fn main() { |
599 | | /// let notify = Arc::new(Notify::new()); |
600 | | /// |
601 | | /// for _ in 0..10 { |
602 | | /// let notified = notify.clone().notified_owned(); |
603 | | /// tokio::spawn(async move { |
604 | | /// notified.await; |
605 | | /// println!("received notification"); |
606 | | /// }); |
607 | | /// } |
608 | | /// |
609 | | /// println!("sending notification"); |
610 | | /// notify.notify_waiters(); |
611 | | /// } |
612 | | /// ``` |
613 | 0 | pub fn notified_owned(self: Arc<Self>) -> OwnedNotified { |
614 | | // we load the number of times notify_waiters |
615 | | // was called and store that in the future. |
616 | 0 | let state = self.state.load(SeqCst); |
617 | 0 | OwnedNotified { |
618 | 0 | notify: self, |
619 | 0 | state: State::Init, |
620 | 0 | notify_waiters_calls: get_num_notify_waiters_calls(state), |
621 | 0 | waiter: Waiter::new(), |
622 | 0 | } |
623 | 0 | } |
624 | | /// Notifies the first waiting task. |
625 | | /// |
626 | | /// If a task is currently waiting, that task is notified. Otherwise, a |
627 | | /// permit is stored in this `Notify` value and the **next** call to |
628 | | /// [`notified().await`] will complete immediately consuming the permit made |
629 | | /// available by this call to `notify_one()`. |
630 | | /// |
631 | | /// At most one permit may be stored by `Notify`. Many sequential calls to |
632 | | /// `notify_one` will result in a single permit being stored. The next call to |
633 | | /// `notified().await` will complete immediately, but the one after that |
634 | | /// will wait. |
635 | | /// |
636 | | /// [`notified().await`]: Notify::notified() |
637 | | /// |
638 | | /// # Examples |
639 | | /// |
640 | | /// ``` |
641 | | /// use tokio::sync::Notify; |
642 | | /// use std::sync::Arc; |
643 | | /// |
644 | | /// #[tokio::main] |
645 | | /// async fn main() { |
646 | | /// let notify = Arc::new(Notify::new()); |
647 | | /// let notify2 = notify.clone(); |
648 | | /// |
649 | | /// tokio::spawn(async move { |
650 | | /// notify2.notified().await; |
651 | | /// println!("received notification"); |
652 | | /// }); |
653 | | /// |
654 | | /// println!("sending notification"); |
655 | | /// notify.notify_one(); |
656 | | /// } |
657 | | /// ``` |
658 | | // Alias for old name in 0.x |
659 | | #[cfg_attr(docsrs, doc(alias = "notify"))] |
660 | 0 | pub fn notify_one(&self) { |
661 | 0 | self.notify_with_strategy(NotifyOneStrategy::Fifo); |
662 | 0 | } |
663 | | |
664 | | /// Notifies the last waiting task. |
665 | | /// |
666 | | /// This function behaves similar to `notify_one`. The only difference is that it wakes |
667 | | /// the most recently added waiter instead of the oldest waiter. |
668 | | /// |
669 | | /// Check the [`notify_one()`] documentation for more info and |
670 | | /// examples. |
671 | | /// |
672 | | /// [`notify_one()`]: Notify::notify_one |
673 | 0 | pub fn notify_last(&self) { |
674 | 0 | self.notify_with_strategy(NotifyOneStrategy::Lifo); |
675 | 0 | } |
676 | | |
677 | 0 | fn notify_with_strategy(&self, strategy: NotifyOneStrategy) { |
678 | | // Load the current state |
679 | 0 | let mut curr = self.state.load(SeqCst); |
680 | | |
681 | | // If the state is `EMPTY`, transition to `NOTIFIED` and return. |
682 | 0 | while let EMPTY | NOTIFIED = get_state(curr) { |
683 | | // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A |
684 | | // happens-before synchronization must happen between this atomic |
685 | | // operation and a task calling `notified().await`. |
686 | 0 | let new = set_state(curr, NOTIFIED); |
687 | 0 | let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst); |
688 | | |
689 | 0 | match res { |
690 | | // No waiters, no further work to do |
691 | 0 | Ok(_) => return, |
692 | 0 | Err(actual) => { |
693 | 0 | curr = actual; |
694 | 0 | } |
695 | | } |
696 | | } |
697 | | |
698 | | // There are waiters, the lock must be acquired to notify. |
699 | 0 | let mut waiters = self.waiters.lock(); |
700 | | |
701 | | // The state must be reloaded while the lock is held. The state may only |
702 | | // transition out of WAITING while the lock is held. |
703 | 0 | curr = self.state.load(SeqCst); |
704 | | |
705 | 0 | if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) { |
706 | 0 | drop(waiters); |
707 | 0 | waker.wake(); |
708 | 0 | } |
709 | 0 | } |
710 | | |
711 | | /// Notifies all waiting tasks. |
712 | | /// |
713 | | /// If a task is currently waiting, that task is notified. Unlike with |
714 | | /// `notify_one()`, no permit is stored to be used by the next call to |
715 | | /// `notified().await`. The purpose of this method is to notify all |
716 | | /// already registered waiters. Registering for notification is done by |
717 | | /// acquiring an instance of the `Notified` future via calling `notified()`. |
718 | | /// |
719 | | /// # Examples |
720 | | /// |
721 | | /// ``` |
722 | | /// use tokio::sync::Notify; |
723 | | /// use std::sync::Arc; |
724 | | /// |
725 | | /// #[tokio::main] |
726 | | /// async fn main() { |
727 | | /// let notify = Arc::new(Notify::new()); |
728 | | /// let notify2 = notify.clone(); |
729 | | /// |
730 | | /// let notified1 = notify.notified(); |
731 | | /// let notified2 = notify.notified(); |
732 | | /// |
733 | | /// let handle = tokio::spawn(async move { |
734 | | /// println!("sending notifications"); |
735 | | /// notify2.notify_waiters(); |
736 | | /// }); |
737 | | /// |
738 | | /// notified1.await; |
739 | | /// notified2.await; |
740 | | /// println!("received notifications"); |
741 | | /// } |
742 | | /// ``` |
743 | 0 | pub fn notify_waiters(&self) { |
744 | 0 | let mut waiters = self.waiters.lock(); |
745 | | |
746 | | // The state must be loaded while the lock is held. The state may only |
747 | | // transition out of WAITING while the lock is held. |
748 | 0 | let curr = self.state.load(SeqCst); |
749 | | |
750 | 0 | if matches!(get_state(curr), EMPTY | NOTIFIED) { |
751 | | // There are no waiting tasks. All we need to do is increment the |
752 | | // number of times this method was called. |
753 | 0 | atomic_inc_num_notify_waiters_calls(&self.state); |
754 | 0 | return; |
755 | 0 | } |
756 | | |
757 | | // Increment the number of times this method was called |
758 | | // and transition to empty. |
759 | 0 | let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY); |
760 | 0 | self.state.store(new_state, SeqCst); |
761 | | |
762 | | // It is critical for `GuardedLinkedList` safety that the guard node is |
763 | | // pinned in memory and is not dropped until the guarded list is dropped. |
764 | 0 | let guard = Waiter::new(); |
765 | 0 | pin!(guard); |
766 | | |
767 | | // We move all waiters to a secondary list. It uses a `GuardedLinkedList` |
768 | | // underneath to allow every waiter to safely remove itself from it. |
769 | | // |
770 | | // * This list will be still guarded by the `waiters` lock. |
771 | | // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. |
772 | | // * This wrapper will empty the list on drop. It is critical for safety |
773 | | // that we will not leave any list entry with a pointer to the local |
774 | | // guard node after this function returns / panics. |
775 | 0 | let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self); |
776 | | |
777 | 0 | let mut wakers = WakeList::new(); |
778 | | 'outer: loop { |
779 | 0 | while wakers.can_push() { |
780 | 0 | match list.pop_back_locked(&mut waiters) { |
781 | 0 | Some(waiter) => { |
782 | | // Safety: we never make mutable references to waiters. |
783 | 0 | let waiter = unsafe { waiter.as_ref() }; |
784 | | |
785 | | // Safety: we hold the lock, so we can access the waker. |
786 | 0 | if let Some(waker) = |
787 | 0 | unsafe { waiter.waker.with_mut(|waker| (*waker).take()) } |
788 | 0 | { |
789 | 0 | wakers.push(waker); |
790 | 0 | } |
791 | | |
792 | | // This waiter is unlinked and will not be shared ever again, release it. |
793 | 0 | waiter.notification.store_release(Notification::All); |
794 | | } |
795 | | None => { |
796 | 0 | break 'outer; |
797 | | } |
798 | | } |
799 | | } |
800 | | |
801 | | // Release the lock before notifying. |
802 | 0 | drop(waiters); |
803 | | |
804 | | // One of the wakers may panic, but the remaining waiters will still |
805 | | // be unlinked from the list in `NotifyWaitersList` destructor. |
806 | 0 | wakers.wake_all(); |
807 | | |
808 | | // Acquire the lock again. |
809 | 0 | waiters = self.waiters.lock(); |
810 | | } |
811 | | |
812 | | // Release the lock before notifying |
813 | 0 | drop(waiters); |
814 | | |
815 | 0 | wakers.wake_all(); |
816 | 0 | } |
817 | | } |
818 | | |
819 | | impl Default for Notify { |
820 | 0 | fn default() -> Notify { |
821 | 0 | Notify::new() |
822 | 0 | } |
823 | | } |
824 | | |
825 | | impl UnwindSafe for Notify {} |
826 | | impl RefUnwindSafe for Notify {} |
827 | | |
828 | 0 | fn notify_locked( |
829 | 0 | waiters: &mut WaitList, |
830 | 0 | state: &AtomicUsize, |
831 | 0 | curr: usize, |
832 | 0 | strategy: NotifyOneStrategy, |
833 | 0 | ) -> Option<Waker> { |
834 | 0 | match get_state(curr) { |
835 | | EMPTY | NOTIFIED => { |
836 | 0 | let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); |
837 | | |
838 | 0 | match res { |
839 | 0 | Ok(_) => None, |
840 | 0 | Err(actual) => { |
841 | 0 | let actual_state = get_state(actual); |
842 | 0 | assert!(actual_state == EMPTY || actual_state == NOTIFIED); |
843 | 0 | state.store(set_state(actual, NOTIFIED), SeqCst); |
844 | 0 | None |
845 | | } |
846 | | } |
847 | | } |
848 | | WAITING => { |
849 | | // At this point, it is guaranteed that the state will not |
850 | | // concurrently change as holding the lock is required to |
851 | | // transition **out** of `WAITING`. |
852 | | // |
853 | | // Get a pending waiter using one of the available dequeue strategies. |
854 | 0 | let waiter = match strategy { |
855 | 0 | NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(), |
856 | 0 | NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(), |
857 | | }; |
858 | | |
859 | | // Safety: we never make mutable references to waiters. |
860 | 0 | let waiter = unsafe { waiter.as_ref() }; |
861 | | |
862 | | // Safety: we hold the lock, so we can access the waker. |
863 | 0 | let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
864 | | |
865 | | // This waiter is unlinked and will not be shared ever again, release it. |
866 | 0 | waiter |
867 | 0 | .notification |
868 | 0 | .store_release(Notification::One(strategy)); |
869 | | |
870 | 0 | if waiters.is_empty() { |
871 | 0 | // As this the **final** waiter in the list, the state |
872 | 0 | // must be transitioned to `EMPTY`. As transitioning |
873 | 0 | // **from** `WAITING` requires the lock to be held, a |
874 | 0 | // `store` is sufficient. |
875 | 0 | state.store(set_state(curr, EMPTY), SeqCst); |
876 | 0 | } |
877 | 0 | waker |
878 | | } |
879 | 0 | _ => unreachable!(), |
880 | | } |
881 | 0 | } |
882 | | |
883 | | // ===== impl Notified ===== |
884 | | |
885 | | impl Notified<'_> { |
886 | | /// Adds this future to the list of futures that are ready to receive |
887 | | /// wakeups from calls to [`notify_one`]. |
888 | | /// |
889 | | /// Polling the future also adds it to the list, so this method should only |
890 | | /// be used if you want to add the future to the list before the first call |
891 | | /// to `poll`. (In fact, this method is equivalent to calling `poll` except |
892 | | /// that no `Waker` is registered.) |
893 | | /// |
894 | | /// This has no effect on notifications sent using [`notify_waiters`], which |
895 | | /// are received as long as they happen after the creation of the `Notified` |
896 | | /// regardless of whether `enable` or `poll` has been called. |
897 | | /// |
898 | | /// This method returns true if the `Notified` is ready. This happens in the |
899 | | /// following situations: |
900 | | /// |
901 | | /// 1. The `notify_waiters` method was called between the creation of the |
902 | | /// `Notified` and the call to this method. |
903 | | /// 2. This is the first call to `enable` or `poll` on this future, and the |
904 | | /// `Notify` was holding a permit from a previous call to `notify_one`. |
905 | | /// The call consumes the permit in that case. |
906 | | /// 3. The future has previously been enabled or polled, and it has since |
907 | | /// then been marked ready by either consuming a permit from the |
908 | | /// `Notify`, or by a call to `notify_one` or `notify_waiters` that |
909 | | /// removed it from the list of futures ready to receive wakeups. |
910 | | /// |
911 | | /// If this method returns true, any future calls to poll on the same future |
912 | | /// will immediately return `Poll::Ready`. |
913 | | /// |
914 | | /// # Examples |
915 | | /// |
916 | | /// Unbound multi-producer multi-consumer (mpmc) channel. |
917 | | /// |
918 | | /// The call to `enable` is important because otherwise if you have two |
919 | | /// calls to `recv` and two calls to `send` in parallel, the following could |
920 | | /// happen: |
921 | | /// |
922 | | /// 1. Both calls to `try_recv` return `None`. |
923 | | /// 2. Both new elements are added to the vector. |
924 | | /// 3. The `notify_one` method is called twice, adding only a single |
925 | | /// permit to the `Notify`. |
926 | | /// 4. Both calls to `recv` reach the `Notified` future. One of them |
927 | | /// consumes the permit, and the other sleeps forever. |
928 | | /// |
929 | | /// By adding the `Notified` futures to the list by calling `enable` before |
930 | | /// `try_recv`, the `notify_one` calls in step three would remove the |
931 | | /// futures from the list and mark them notified instead of adding a permit |
932 | | /// to the `Notify`. This ensures that both futures are woken. |
933 | | /// |
934 | | /// ``` |
935 | | /// use tokio::sync::Notify; |
936 | | /// |
937 | | /// use std::collections::VecDeque; |
938 | | /// use std::sync::Mutex; |
939 | | /// |
940 | | /// struct Channel<T> { |
941 | | /// messages: Mutex<VecDeque<T>>, |
942 | | /// notify_on_sent: Notify, |
943 | | /// } |
944 | | /// |
945 | | /// impl<T> Channel<T> { |
946 | | /// pub fn send(&self, msg: T) { |
947 | | /// let mut locked_queue = self.messages.lock().unwrap(); |
948 | | /// locked_queue.push_back(msg); |
949 | | /// drop(locked_queue); |
950 | | /// |
951 | | /// // Send a notification to one of the calls currently |
952 | | /// // waiting in a call to `recv`. |
953 | | /// self.notify_on_sent.notify_one(); |
954 | | /// } |
955 | | /// |
956 | | /// pub fn try_recv(&self) -> Option<T> { |
957 | | /// let mut locked_queue = self.messages.lock().unwrap(); |
958 | | /// locked_queue.pop_front() |
959 | | /// } |
960 | | /// |
961 | | /// pub async fn recv(&self) -> T { |
962 | | /// let future = self.notify_on_sent.notified(); |
963 | | /// tokio::pin!(future); |
964 | | /// |
965 | | /// loop { |
966 | | /// // Make sure that no wakeup is lost if we get |
967 | | /// // `None` from `try_recv`. |
968 | | /// future.as_mut().enable(); |
969 | | /// |
970 | | /// if let Some(msg) = self.try_recv() { |
971 | | /// return msg; |
972 | | /// } |
973 | | /// |
974 | | /// // Wait for a call to `notify_one`. |
975 | | /// // |
976 | | /// // This uses `.as_mut()` to avoid consuming the future, |
977 | | /// // which lets us call `Pin::set` below. |
978 | | /// future.as_mut().await; |
979 | | /// |
980 | | /// // Reset the future in case another call to |
981 | | /// // `try_recv` got the message before us. |
982 | | /// future.set(self.notify_on_sent.notified()); |
983 | | /// } |
984 | | /// } |
985 | | /// } |
986 | | /// ``` |
987 | | /// |
988 | | /// [`notify_one`]: Notify::notify_one() |
989 | | /// [`notify_waiters`]: Notify::notify_waiters() |
990 | 0 | pub fn enable(self: Pin<&mut Self>) -> bool { |
991 | 0 | self.poll_notified(None).is_ready() |
992 | 0 | } |
993 | | |
994 | 0 | fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> { |
995 | | unsafe { |
996 | | // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`. |
997 | | |
998 | 0 | is_unpin::<&Notify>(); |
999 | 0 | is_unpin::<State>(); |
1000 | 0 | is_unpin::<usize>(); |
1001 | | |
1002 | 0 | let me = self.get_unchecked_mut(); |
1003 | 0 | NotifiedProject { |
1004 | 0 | notify: me.notify, |
1005 | 0 | state: &mut me.state, |
1006 | 0 | notify_waiters_calls: &me.notify_waiters_calls, |
1007 | 0 | waiter: &me.waiter, |
1008 | 0 | } |
1009 | | } |
1010 | 0 | } |
1011 | | |
1012 | 0 | fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { |
1013 | 0 | self.project().poll_notified(waker) |
1014 | 0 | } |
1015 | | } |
1016 | | |
1017 | | impl Future for Notified<'_> { |
1018 | | type Output = (); |
1019 | | |
1020 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
1021 | 0 | self.poll_notified(Some(cx.waker())) |
1022 | 0 | } |
1023 | | } |
1024 | | |
1025 | | impl Drop for Notified<'_> { |
1026 | 0 | fn drop(&mut self) { |
1027 | | // Safety: The type only transitions to a "Waiting" state when pinned. |
1028 | 0 | unsafe { Pin::new_unchecked(self) } |
1029 | 0 | .project() |
1030 | 0 | .drop_notified(); |
1031 | 0 | } |
1032 | | } |
1033 | | |
1034 | | // ===== impl OwnedNotified ===== |
1035 | | |
1036 | | impl OwnedNotified { |
1037 | | /// Adds this future to the list of futures that are ready to receive |
1038 | | /// wakeups from calls to [`notify_one`]. |
1039 | | /// |
1040 | | /// See [`Notified::enable`] for more details. |
1041 | | /// |
1042 | | /// [`notify_one`]: Notify::notify_one() |
1043 | 0 | pub fn enable(self: Pin<&mut Self>) -> bool { |
1044 | 0 | self.poll_notified(None).is_ready() |
1045 | 0 | } |
1046 | | |
1047 | | /// A custom `project` implementation is used in place of `pin-project-lite` |
1048 | | /// as a custom drop implementation is needed. |
1049 | 0 | fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> { |
1050 | | unsafe { |
1051 | | // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`. |
1052 | | |
1053 | 0 | is_unpin::<&Notify>(); |
1054 | 0 | is_unpin::<State>(); |
1055 | 0 | is_unpin::<usize>(); |
1056 | | |
1057 | 0 | let me = self.get_unchecked_mut(); |
1058 | 0 | NotifiedProject { |
1059 | 0 | notify: &me.notify, |
1060 | 0 | state: &mut me.state, |
1061 | 0 | notify_waiters_calls: &me.notify_waiters_calls, |
1062 | 0 | waiter: &me.waiter, |
1063 | 0 | } |
1064 | | } |
1065 | 0 | } |
1066 | | |
1067 | 0 | fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { |
1068 | 0 | self.project().poll_notified(waker) |
1069 | 0 | } |
1070 | | } |
1071 | | |
1072 | | impl Future for OwnedNotified { |
1073 | | type Output = (); |
1074 | | |
1075 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
1076 | 0 | self.poll_notified(Some(cx.waker())) |
1077 | 0 | } |
1078 | | } |
1079 | | |
1080 | | impl Drop for OwnedNotified { |
1081 | 0 | fn drop(&mut self) { |
1082 | | // Safety: The type only transitions to a "Waiting" state when pinned. |
1083 | 0 | unsafe { Pin::new_unchecked(self) } |
1084 | 0 | .project() |
1085 | 0 | .drop_notified(); |
1086 | 0 | } |
1087 | | } |
1088 | | |
1089 | | // ===== impl NotifiedProject ===== |
1090 | | |
1091 | | impl NotifiedProject<'_> { |
1092 | 0 | fn poll_notified(self, waker: Option<&Waker>) -> Poll<()> { |
1093 | | let NotifiedProject { |
1094 | 0 | notify, |
1095 | 0 | state, |
1096 | 0 | notify_waiters_calls, |
1097 | 0 | waiter, |
1098 | 0 | } = self; |
1099 | | |
1100 | | 'outer_loop: loop { |
1101 | 0 | match *state { |
1102 | | State::Init => { |
1103 | 0 | let curr = notify.state.load(SeqCst); |
1104 | | |
1105 | | // Optimistically try acquiring a pending notification |
1106 | 0 | let res = notify.state.compare_exchange( |
1107 | 0 | set_state(curr, NOTIFIED), |
1108 | 0 | set_state(curr, EMPTY), |
1109 | 0 | SeqCst, |
1110 | 0 | SeqCst, |
1111 | 0 | ); |
1112 | | |
1113 | 0 | if res.is_ok() { |
1114 | | // Acquired the notification |
1115 | 0 | *state = State::Done; |
1116 | 0 | continue 'outer_loop; |
1117 | 0 | } |
1118 | | |
1119 | | // Clone the waker before locking, a waker clone can be |
1120 | | // triggering arbitrary code. |
1121 | 0 | let waker = waker.cloned(); |
1122 | | |
1123 | | // Acquire the lock and attempt to transition to the waiting |
1124 | | // state. |
1125 | 0 | let mut waiters = notify.waiters.lock(); |
1126 | | |
1127 | | // Reload the state with the lock held |
1128 | 0 | let mut curr = notify.state.load(SeqCst); |
1129 | | |
1130 | | // if notify_waiters has been called after the future |
1131 | | // was created, then we are done |
1132 | 0 | if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { |
1133 | 0 | *state = State::Done; |
1134 | 0 | continue 'outer_loop; |
1135 | 0 | } |
1136 | | |
1137 | | // Transition the state to WAITING. |
1138 | | loop { |
1139 | 0 | match get_state(curr) { |
1140 | | EMPTY => { |
1141 | | // Transition to WAITING |
1142 | 0 | let res = notify.state.compare_exchange( |
1143 | 0 | set_state(curr, EMPTY), |
1144 | 0 | set_state(curr, WAITING), |
1145 | 0 | SeqCst, |
1146 | 0 | SeqCst, |
1147 | 0 | ); |
1148 | | |
1149 | 0 | if let Err(actual) = res { |
1150 | 0 | assert_eq!(get_state(actual), NOTIFIED); |
1151 | 0 | curr = actual; |
1152 | | } else { |
1153 | 0 | break; |
1154 | | } |
1155 | | } |
1156 | 0 | WAITING => break, |
1157 | | NOTIFIED => { |
1158 | | // Try consuming the notification |
1159 | 0 | let res = notify.state.compare_exchange( |
1160 | 0 | set_state(curr, NOTIFIED), |
1161 | 0 | set_state(curr, EMPTY), |
1162 | 0 | SeqCst, |
1163 | 0 | SeqCst, |
1164 | 0 | ); |
1165 | | |
1166 | 0 | match res { |
1167 | | Ok(_) => { |
1168 | | // Acquired the notification |
1169 | 0 | *state = State::Done; |
1170 | 0 | continue 'outer_loop; |
1171 | | } |
1172 | 0 | Err(actual) => { |
1173 | 0 | assert_eq!(get_state(actual), EMPTY); |
1174 | 0 | curr = actual; |
1175 | | } |
1176 | | } |
1177 | | } |
1178 | 0 | _ => unreachable!(), |
1179 | | } |
1180 | | } |
1181 | | |
1182 | 0 | let mut old_waker = None; |
1183 | 0 | if waker.is_some() { |
1184 | | // Safety: called while locked. |
1185 | | // |
1186 | | // The use of `old_waiter` here is not necessary, as the field is always |
1187 | | // None when we reach this line. |
1188 | | unsafe { |
1189 | 0 | old_waker = |
1190 | 0 | waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker)); |
1191 | | } |
1192 | 0 | } |
1193 | | |
1194 | | // Insert the waiter into the linked list |
1195 | 0 | waiters.push_front(NonNull::from(waiter)); |
1196 | | |
1197 | 0 | *state = State::Waiting; |
1198 | | |
1199 | 0 | drop(waiters); |
1200 | 0 | drop(old_waker); |
1201 | | |
1202 | 0 | return Poll::Pending; |
1203 | | } |
1204 | | State::Waiting => { |
1205 | | #[cfg(tokio_taskdump)] |
1206 | | if let Some(waker) = waker { |
1207 | | let mut ctx = Context::from_waker(waker); |
1208 | | std::task::ready!(crate::trace::trace_leaf(&mut ctx)); |
1209 | | } |
1210 | | |
1211 | 0 | if waiter.notification.load(Acquire).is_some() { |
1212 | | // Safety: waiter is already unlinked and will not be shared again, |
1213 | | // so we have an exclusive access to `waker`. |
1214 | 0 | drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }); |
1215 | | |
1216 | 0 | waiter.notification.clear(); |
1217 | 0 | *state = State::Done; |
1218 | 0 | return Poll::Ready(()); |
1219 | 0 | } |
1220 | | |
1221 | | // Our waiter was not notified, implying it is still stored in a waiter |
1222 | | // list (guarded by `notify.waiters`). In order to access the waker |
1223 | | // fields, we must acquire the lock. |
1224 | | |
1225 | 0 | let mut old_waker = None; |
1226 | 0 | let mut waiters = notify.waiters.lock(); |
1227 | | |
1228 | | // We hold the lock and notifications are set only with the lock held, |
1229 | | // so this can be relaxed, because the happens-before relationship is |
1230 | | // established through the mutex. |
1231 | 0 | if waiter.notification.load(Relaxed).is_some() { |
1232 | | // Safety: waiter is already unlinked and will not be shared again, |
1233 | | // so we have an exclusive access to `waker`. |
1234 | 0 | old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
1235 | | |
1236 | 0 | waiter.notification.clear(); |
1237 | | |
1238 | | // Drop the old waker after releasing the lock. |
1239 | 0 | drop(waiters); |
1240 | 0 | drop(old_waker); |
1241 | | |
1242 | 0 | *state = State::Done; |
1243 | 0 | return Poll::Ready(()); |
1244 | 0 | } |
1245 | | |
1246 | | // Load the state with the lock held. |
1247 | 0 | let curr = notify.state.load(SeqCst); |
1248 | | |
1249 | 0 | if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { |
1250 | | // Before we add a waiter to the list we check if these numbers are |
1251 | | // different while holding the lock. If these numbers are different now, |
1252 | | // it means that there is a call to `notify_waiters` in progress and this |
1253 | | // waiter must be contained by a guarded list used in `notify_waiters`. |
1254 | | // We can treat the waiter as notified and remove it from the list, as |
1255 | | // it would have been notified in the `notify_waiters` call anyways. |
1256 | | |
1257 | | // Safety: we hold the lock, so we can modify the waker. |
1258 | 0 | old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
1259 | | |
1260 | | // Safety: we hold the lock, so we have an exclusive access to the list. |
1261 | | // The list is used in `notify_waiters`, so it must be guarded. |
1262 | 0 | unsafe { waiters.remove(NonNull::from(waiter)) }; |
1263 | | |
1264 | 0 | *state = State::Done; |
1265 | | } else { |
1266 | | // Safety: we hold the lock, so we can modify the waker. |
1267 | | unsafe { |
1268 | 0 | waiter.waker.with_mut(|v| { |
1269 | 0 | if let Some(waker) = waker { |
1270 | 0 | let should_update = match &*v { |
1271 | 0 | Some(current_waker) => !current_waker.will_wake(waker), |
1272 | 0 | None => true, |
1273 | | }; |
1274 | 0 | if should_update { |
1275 | 0 | old_waker = (*v).replace(waker.clone()); |
1276 | 0 | } |
1277 | 0 | } |
1278 | 0 | }); |
1279 | | } |
1280 | | |
1281 | | // Drop the old waker after releasing the lock. |
1282 | 0 | drop(waiters); |
1283 | 0 | drop(old_waker); |
1284 | | |
1285 | 0 | return Poll::Pending; |
1286 | | } |
1287 | | |
1288 | | // Explicit drop of the lock to indicate the scope that the |
1289 | | // lock is held. Because holding the lock is required to |
1290 | | // ensure safe access to fields not held within the lock, it |
1291 | | // is helpful to visualize the scope of the critical |
1292 | | // section. |
1293 | 0 | drop(waiters); |
1294 | | |
1295 | | // Drop the old waker after releasing the lock. |
1296 | 0 | drop(old_waker); |
1297 | | } |
1298 | | State::Done => { |
1299 | | #[cfg(tokio_taskdump)] |
1300 | | if let Some(waker) = waker { |
1301 | | let mut ctx = Context::from_waker(waker); |
1302 | | std::task::ready!(crate::trace::trace_leaf(&mut ctx)); |
1303 | | } |
1304 | 0 | return Poll::Ready(()); |
1305 | | } |
1306 | | } |
1307 | | } |
1308 | 0 | } |
1309 | | |
1310 | 0 | fn drop_notified(self) { |
1311 | | let NotifiedProject { |
1312 | 0 | notify, |
1313 | 0 | state, |
1314 | 0 | waiter, |
1315 | | .. |
1316 | 0 | } = self; |
1317 | | |
1318 | | // This is where we ensure safety. The `Notified` value is being |
1319 | | // dropped, which means we must ensure that the waiter entry is no |
1320 | | // longer stored in the linked list. |
1321 | 0 | if matches!(*state, State::Waiting) { |
1322 | 0 | let mut waiters = notify.waiters.lock(); |
1323 | 0 | let mut notify_state = notify.state.load(SeqCst); |
1324 | | |
1325 | | // We hold the lock, so this field is not concurrently accessed by |
1326 | | // `notify_*` functions and we can use the relaxed ordering. |
1327 | 0 | let notification = waiter.notification.load(Relaxed); |
1328 | | |
1329 | | // remove the entry from the list (if not already removed) |
1330 | | // |
1331 | | // Safety: we hold the lock, so we have an exclusive access to every list the |
1332 | | // waiter may be contained in. If the node is not contained in the `waiters` |
1333 | | // list, then it is contained by a guarded list used by `notify_waiters`. |
1334 | 0 | unsafe { waiters.remove(NonNull::from(waiter)) }; |
1335 | | |
1336 | 0 | if waiters.is_empty() && get_state(notify_state) == WAITING { |
1337 | 0 | notify_state = set_state(notify_state, EMPTY); |
1338 | 0 | notify.state.store(notify_state, SeqCst); |
1339 | 0 | } |
1340 | | |
1341 | | // See if the node was notified but not received. In this case, if |
1342 | | // the notification was triggered via `notify_one`, it must be sent |
1343 | | // to the next waiter. |
1344 | 0 | if let Some(Notification::One(strategy)) = notification { |
1345 | 0 | if let Some(waker) = |
1346 | 0 | notify_locked(&mut waiters, ¬ify.state, notify_state, strategy) |
1347 | 0 | { |
1348 | 0 | drop(waiters); |
1349 | 0 | waker.wake(); |
1350 | 0 | } |
1351 | 0 | } |
1352 | 0 | } |
1353 | 0 | } |
1354 | | } |
1355 | | |
1356 | | /// # Safety |
1357 | | /// |
1358 | | /// `Waiter` is forced to be !Unpin. |
1359 | | unsafe impl linked_list::Link for Waiter { |
1360 | | type Handle = NonNull<Waiter>; |
1361 | | type Target = Waiter; |
1362 | | |
1363 | 0 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
1364 | 0 | *handle |
1365 | 0 | } |
1366 | | |
1367 | 0 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
1368 | 0 | ptr |
1369 | 0 | } |
1370 | | |
1371 | 0 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
1372 | 0 | Waiter::addr_of_pointers(target) |
1373 | 0 | } |
1374 | | } |
1375 | | |
1376 | 0 | fn is_unpin<T: Unpin>() {}Unexecuted instantiation: tokio::sync::notify::is_unpin::<tokio::sync::notify::State> Unexecuted instantiation: tokio::sync::notify::is_unpin::<&tokio::sync::notify::Notify> Unexecuted instantiation: tokio::sync::notify::is_unpin::<usize> |