Coverage Report

Created: 2026-01-09 06:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/sync/notify.rs
Line
Count
Source
1
// Allow `unreachable_pub` warnings when sync is not enabled
2
// due to the usage of `Notify` within the `rt` feature set.
3
// When this module is compiled with `sync` enabled we will warn on
4
// this lint. When `rt` is enabled we use `pub(crate)` which
5
// triggers this warning but it is safe to ignore in this case.
6
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8
use crate::loom::cell::UnsafeCell;
9
use crate::loom::sync::atomic::AtomicUsize;
10
use crate::loom::sync::Mutex;
11
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12
use crate::util::WakeList;
13
14
use std::future::Future;
15
use std::marker::PhantomPinned;
16
use std::panic::{RefUnwindSafe, UnwindSafe};
17
use std::pin::Pin;
18
use std::ptr::NonNull;
19
use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20
use std::sync::Arc;
21
use std::task::{Context, Poll, Waker};
22
23
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
25
26
/// Notifies a single task to wake up.
27
///
28
/// `Notify` provides a basic mechanism to notify a single task of an event.
29
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
30
/// another task to perform an operation.
31
///
32
/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
33
/// [`notified().await`] method waits for a permit to become available, and
34
/// [`notify_one()`] sets a permit **if there currently are no available
35
/// permits**.
36
///
37
/// The synchronization details of `Notify` are similar to
38
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
39
/// value contains a single permit. [`notified().await`] waits for the permit to
40
/// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
41
/// the permit, waking a pending task if there is one.
42
///
43
/// If `notify_one()` is called **before** `notified().await`, then the next
44
/// call to `notified().await` will complete immediately, consuming the permit.
45
/// Any subsequent calls to `notified().await` will wait for a new permit.
46
///
47
/// If `notify_one()` is called **multiple** times before `notified().await`,
48
/// only a **single** permit is stored. The next call to `notified().await` will
49
/// complete immediately, but the one after will wait for a new permit.
50
///
51
/// # Examples
52
///
53
/// Basic usage.
54
///
55
/// ```
56
/// use tokio::sync::Notify;
57
/// use std::sync::Arc;
58
///
59
/// # #[tokio::main(flavor = "current_thread")]
60
/// # async fn main() {
61
/// let notify = Arc::new(Notify::new());
62
/// let notify2 = notify.clone();
63
///
64
/// let handle = tokio::spawn(async move {
65
///     notify2.notified().await;
66
///     println!("received notification");
67
/// });
68
///
69
/// println!("sending notification");
70
/// notify.notify_one();
71
///
72
/// // Wait for task to receive notification.
73
/// handle.await.unwrap();
74
/// # }
75
/// ```
76
///
77
/// Unbound multi-producer single-consumer (mpsc) channel.
78
///
79
/// No wakeups can be lost when using this channel because the call to
80
/// `notify_one()` will store a permit in the `Notify`, which the following call
81
/// to `notified()` will consume.
82
///
83
/// ```
84
/// use tokio::sync::Notify;
85
///
86
/// use std::collections::VecDeque;
87
/// use std::sync::Mutex;
88
///
89
/// struct Channel<T> {
90
///     values: Mutex<VecDeque<T>>,
91
///     notify: Notify,
92
/// }
93
///
94
/// impl<T> Channel<T> {
95
///     pub fn send(&self, value: T) {
96
///         self.values.lock().unwrap()
97
///             .push_back(value);
98
///
99
///         // Notify the consumer a value is available
100
///         self.notify.notify_one();
101
///     }
102
///
103
///     // This is a single-consumer channel, so several concurrent calls to
104
///     // `recv` are not allowed.
105
///     pub async fn recv(&self) -> T {
106
///         loop {
107
///             // Drain values
108
///             if let Some(value) = self.values.lock().unwrap().pop_front() {
109
///                 return value;
110
///             }
111
///
112
///             // Wait for values to be available
113
///             self.notify.notified().await;
114
///         }
115
///     }
116
/// }
117
/// ```
118
///
119
/// Unbound multi-producer multi-consumer (mpmc) channel.
120
///
121
/// The call to [`enable`] is important because otherwise if you have two
122
/// calls to `recv` and two calls to `send` in parallel, the following could
123
/// happen:
124
///
125
///  1. Both calls to `try_recv` return `None`.
126
///  2. Both new elements are added to the vector.
127
///  3. The `notify_one` method is called twice, adding only a single
128
///     permit to the `Notify`.
129
///  4. Both calls to `recv` reach the `Notified` future. One of them
130
///     consumes the permit, and the other sleeps forever.
131
///
132
/// By adding the `Notified` futures to the list by calling `enable` before
133
/// `try_recv`, the `notify_one` calls in step three would remove the
134
/// futures from the list and mark them notified instead of adding a permit
135
/// to the `Notify`. This ensures that both futures are woken.
136
///
137
/// Notice that this failure can only happen if there are two concurrent calls
138
/// to `recv`. This is why the mpsc example above does not require a call to
139
/// `enable`.
140
///
141
/// ```
142
/// use tokio::sync::Notify;
143
///
144
/// use std::collections::VecDeque;
145
/// use std::sync::Mutex;
146
///
147
/// struct Channel<T> {
148
///     messages: Mutex<VecDeque<T>>,
149
///     notify_on_sent: Notify,
150
/// }
151
///
152
/// impl<T> Channel<T> {
153
///     pub fn send(&self, msg: T) {
154
///         let mut locked_queue = self.messages.lock().unwrap();
155
///         locked_queue.push_back(msg);
156
///         drop(locked_queue);
157
///
158
///         // Send a notification to one of the calls currently
159
///         // waiting in a call to `recv`.
160
///         self.notify_on_sent.notify_one();
161
///     }
162
///
163
///     pub fn try_recv(&self) -> Option<T> {
164
///         let mut locked_queue = self.messages.lock().unwrap();
165
///         locked_queue.pop_front()
166
///     }
167
///
168
///     pub async fn recv(&self) -> T {
169
///         let future = self.notify_on_sent.notified();
170
///         tokio::pin!(future);
171
///
172
///         loop {
173
///             // Make sure that no wakeup is lost if we get
174
///             // `None` from `try_recv`.
175
///             future.as_mut().enable();
176
///
177
///             if let Some(msg) = self.try_recv() {
178
///                 return msg;
179
///             }
180
///
181
///             // Wait for a call to `notify_one`.
182
///             //
183
///             // This uses `.as_mut()` to avoid consuming the future,
184
///             // which lets us call `Pin::set` below.
185
///             future.as_mut().await;
186
///
187
///             // Reset the future in case another call to
188
///             // `try_recv` got the message before us.
189
///             future.set(self.notify_on_sent.notified());
190
///         }
191
///     }
192
/// }
193
/// ```
194
///
195
/// [park]: std::thread::park
196
/// [unpark]: std::thread::Thread::unpark
197
/// [`notified().await`]: Notify::notified()
198
/// [`notify_one()`]: Notify::notify_one()
199
/// [`enable`]: Notified::enable()
200
/// [`Semaphore`]: crate::sync::Semaphore
201
#[derive(Debug)]
202
pub struct Notify {
203
    // `state` uses 2 bits to store one of `EMPTY`,
204
    // `WAITING` or `NOTIFIED`. The rest of the bits
205
    // are used to store the number of times `notify_waiters`
206
    // was called.
207
    //
208
    // Throughout the code there are two assumptions:
209
    // - state can be transitioned *from* `WAITING` only if
210
    //   `waiters` lock is held
211
    // - number of times `notify_waiters` was called can
212
    //   be modified only if `waiters` lock is held
213
    state: AtomicUsize,
214
    waiters: Mutex<WaitList>,
215
}
216
217
#[derive(Debug)]
218
struct Waiter {
219
    /// Intrusive linked-list pointers.
220
    pointers: linked_list::Pointers<Waiter>,
221
222
    /// Waiting task's waker. Depending on the value of `notification`,
223
    /// this field is either protected by the `waiters` lock in
224
    /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
225
    waker: UnsafeCell<Option<Waker>>,
226
227
    /// Notification for this waiter. Uses 2 bits to store if and how was
228
    /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
229
    /// the rest of it is unused.
230
    /// * if it's `None`, then `waker` is protected by the `waiters` lock.
231
    /// * if it's `Some`, then `waker` is exclusively owned by the
232
    ///   enclosing `Waiter` and can be accessed without locking.
233
    notification: AtomicNotification,
234
235
    /// Should not be `Unpin`.
236
    _p: PhantomPinned,
237
}
238
239
impl Waiter {
240
0
    fn new() -> Waiter {
241
0
        Waiter {
242
0
            pointers: linked_list::Pointers::new(),
243
0
            waker: UnsafeCell::new(None),
244
0
            notification: AtomicNotification::none(),
245
0
            _p: PhantomPinned,
246
0
        }
247
0
    }
248
}
249
250
generate_addr_of_methods! {
251
    impl<> Waiter {
252
        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
253
            &self.pointers
254
        }
255
    }
256
}
257
258
// No notification.
259
const NOTIFICATION_NONE: usize = 0b000;
260
261
// Notification type used by `notify_one`.
262
const NOTIFICATION_ONE: usize = 0b001;
263
264
// Notification type used by `notify_last`.
265
const NOTIFICATION_LAST: usize = 0b101;
266
267
// Notification type used by `notify_waiters`.
268
const NOTIFICATION_ALL: usize = 0b010;
269
270
/// Notification for a `Waiter`.
271
/// This struct is equivalent to `Option<Notification>`, but uses
272
/// `AtomicUsize` inside for atomic operations.
273
#[derive(Debug)]
274
struct AtomicNotification(AtomicUsize);
275
276
impl AtomicNotification {
277
0
    fn none() -> Self {
278
0
        AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
279
0
    }
280
281
    /// Store-release a notification.
282
    /// This method should be called exactly once.
283
0
    fn store_release(&self, notification: Notification) {
284
0
        let data: usize = match notification {
285
0
            Notification::All => NOTIFICATION_ALL,
286
0
            Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
287
0
            Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
288
        };
289
0
        self.0.store(data, Release);
290
0
    }
291
292
0
    fn load(&self, ordering: Ordering) -> Option<Notification> {
293
0
        let data = self.0.load(ordering);
294
0
        match data {
295
0
            NOTIFICATION_NONE => None,
296
0
            NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
297
0
            NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
298
0
            NOTIFICATION_ALL => Some(Notification::All),
299
0
            _ => unreachable!(),
300
        }
301
0
    }
302
303
    /// Clears the notification.
304
    /// This method is used by a `Notified` future to consume the
305
    /// notification. It uses relaxed ordering and should be only
306
    /// used once the atomic notification is no longer shared.
307
0
    fn clear(&self) {
308
0
        self.0.store(NOTIFICATION_NONE, Relaxed);
309
0
    }
310
}
311
312
#[derive(Debug, PartialEq, Eq)]
313
#[repr(usize)]
314
enum NotifyOneStrategy {
315
    Fifo,
316
    Lifo,
317
}
318
319
#[derive(Debug, PartialEq, Eq)]
320
#[repr(usize)]
321
enum Notification {
322
    One(NotifyOneStrategy),
323
    All,
324
}
325
326
/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
327
/// and gates the access to it on `notify.waiters` mutex. It also empties
328
/// the list on drop.
329
struct NotifyWaitersList<'a> {
330
    list: GuardedWaitList,
331
    is_empty: bool,
332
    notify: &'a Notify,
333
}
334
335
impl<'a> NotifyWaitersList<'a> {
336
0
    fn new(
337
0
        unguarded_list: WaitList,
338
0
        guard: Pin<&'a Waiter>,
339
0
        notify: &'a Notify,
340
0
    ) -> NotifyWaitersList<'a> {
341
0
        let guard_ptr = NonNull::from(guard.get_ref());
342
0
        let list = unguarded_list.into_guarded(guard_ptr);
343
0
        NotifyWaitersList {
344
0
            list,
345
0
            is_empty: false,
346
0
            notify,
347
0
        }
348
0
    }
349
350
    /// Removes the last element from the guarded list. Modifying this list
351
    /// requires an exclusive access to the main list in `Notify`.
352
0
    fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
353
0
        let result = self.list.pop_back();
354
0
        if result.is_none() {
355
0
            // Save information about emptiness to avoid waiting for lock
356
0
            // in the destructor.
357
0
            self.is_empty = true;
358
0
        }
359
0
        result
360
0
    }
361
}
362
363
impl Drop for NotifyWaitersList<'_> {
364
0
    fn drop(&mut self) {
365
        // If the list is not empty, we unlink all waiters from it.
366
        // We do not wake the waiters to avoid double panics.
367
0
        if !self.is_empty {
368
0
            let _lock_guard = self.notify.waiters.lock();
369
0
            while let Some(waiter) = self.list.pop_back() {
370
0
                // Safety: we never make mutable references to waiters.
371
0
                let waiter = unsafe { waiter.as_ref() };
372
0
                waiter.notification.store_release(Notification::All);
373
0
            }
374
0
        }
375
0
    }
376
}
377
378
/// Future returned from [`Notify::notified()`].
379
///
380
/// This future is fused, so once it has completed, any future calls to poll
381
/// will immediately return `Poll::Ready`.
382
#[derive(Debug)]
383
#[must_use = "futures do nothing unless you `.await` or poll them"]
384
pub struct Notified<'a> {
385
    /// The `Notify` being received on.
386
    notify: &'a Notify,
387
388
    /// The current state of the receiving process.
389
    state: State,
390
391
    /// Number of calls to `notify_waiters` at the time of creation.
392
    notify_waiters_calls: usize,
393
394
    /// Entry in the waiter `LinkedList`.
395
    waiter: Waiter,
396
}
397
398
unsafe impl<'a> Send for Notified<'a> {}
399
unsafe impl<'a> Sync for Notified<'a> {}
400
401
/// Future returned from [`Notify::notified_owned()`].
402
///
403
/// This future is fused, so once it has completed, any future calls to poll
404
/// will immediately return `Poll::Ready`.
405
#[derive(Debug)]
406
#[must_use = "futures do nothing unless you `.await` or poll them"]
407
pub struct OwnedNotified {
408
    /// The `Notify` being received on.
409
    notify: Arc<Notify>,
410
411
    /// The current state of the receiving process.
412
    state: State,
413
414
    /// Number of calls to `notify_waiters` at the time of creation.
415
    notify_waiters_calls: usize,
416
417
    /// Entry in the waiter `LinkedList`.
418
    waiter: Waiter,
419
}
420
421
unsafe impl Sync for OwnedNotified {}
422
423
/// A custom `project` implementation is used in place of `pin-project-lite`
424
/// as a custom drop for [`Notified`] and [`OwnedNotified`] implementation
425
/// is needed.
426
struct NotifiedProject<'a> {
427
    notify: &'a Notify,
428
    state: &'a mut State,
429
    notify_waiters_calls: &'a usize,
430
    waiter: &'a Waiter,
431
}
432
433
#[derive(Debug)]
434
enum State {
435
    Init,
436
    Waiting,
437
    Done,
438
}
439
440
const NOTIFY_WAITERS_SHIFT: usize = 2;
441
const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
442
const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
443
444
/// Initial "idle" state.
445
const EMPTY: usize = 0;
446
447
/// One or more threads are currently waiting to be notified.
448
const WAITING: usize = 1;
449
450
/// Pending notification.
451
const NOTIFIED: usize = 2;
452
453
0
fn set_state(data: usize, state: usize) -> usize {
454
0
    (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
455
0
}
456
457
128
fn get_state(data: usize) -> usize {
458
128
    data & STATE_MASK
459
128
}
460
461
0
fn get_num_notify_waiters_calls(data: usize) -> usize {
462
0
    (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
463
0
}
464
465
0
fn inc_num_notify_waiters_calls(data: usize) -> usize {
466
0
    data + (1 << NOTIFY_WAITERS_SHIFT)
467
0
}
468
469
128
fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
470
128
    data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
471
128
}
472
473
impl Notify {
474
    /// Create a new `Notify`, initialized without a permit.
475
    ///
476
    /// # Examples
477
    ///
478
    /// ```
479
    /// use tokio::sync::Notify;
480
    ///
481
    /// let notify = Notify::new();
482
    /// ```
483
1.15k
    pub fn new() -> Notify {
484
1.15k
        Notify {
485
1.15k
            state: AtomicUsize::new(0),
486
1.15k
            waiters: Mutex::new(LinkedList::new()),
487
1.15k
        }
488
1.15k
    }
489
490
    /// Create a new `Notify`, initialized without a permit.
491
    ///
492
    /// When using the `tracing` [unstable feature], a `Notify` created with
493
    /// `const_new` will not be instrumented. As such, it will not be visible
494
    /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
495
    /// an instrumented object if that is needed.
496
    ///
497
    /// # Examples
498
    ///
499
    /// ```
500
    /// use tokio::sync::Notify;
501
    ///
502
    /// static NOTIFY: Notify = Notify::const_new();
503
    /// ```
504
    ///
505
    /// [`tokio-console`]: https://github.com/tokio-rs/console
506
    /// [unstable feature]: crate#unstable-features
507
    #[cfg(not(all(loom, test)))]
508
0
    pub const fn const_new() -> Notify {
509
0
        Notify {
510
0
            state: AtomicUsize::new(0),
511
0
            waiters: Mutex::const_new(LinkedList::new()),
512
0
        }
513
0
    }
514
515
    /// Wait for a notification.
516
    ///
517
    /// Equivalent to:
518
    ///
519
    /// ```ignore
520
    /// async fn notified(&self);
521
    /// ```
522
    ///
523
    /// Each `Notify` value holds a single permit. If a permit is available from
524
    /// an earlier call to [`notify_one()`], then `notified().await` will complete
525
    /// immediately, consuming that permit. Otherwise, `notified().await` waits
526
    /// for a permit to be made available by the next call to `notify_one()`.
527
    ///
528
    /// The `Notified` future is not guaranteed to receive wakeups from calls to
529
    /// `notify_one()` if it has not yet been polled. See the documentation for
530
    /// [`Notified::enable()`] for more details.
531
    ///
532
    /// The `Notified` future is guaranteed to receive wakeups from
533
    /// `notify_waiters()` as soon as it has been created, even if it has not
534
    /// yet been polled.
535
    ///
536
    /// [`notify_one()`]: Notify::notify_one
537
    /// [`Notified::enable()`]: Notified::enable
538
    ///
539
    /// # Cancel safety
540
    ///
541
    /// This method uses a queue to fairly distribute notifications in the order
542
    /// they were requested. Cancelling a call to `notified` makes you lose your
543
    /// place in the queue.
544
    ///
545
    /// # Examples
546
    ///
547
    /// ```
548
    /// use tokio::sync::Notify;
549
    /// use std::sync::Arc;
550
    ///
551
    /// # #[tokio::main(flavor = "current_thread")]
552
    /// # async fn main() {
553
    /// let notify = Arc::new(Notify::new());
554
    /// let notify2 = notify.clone();
555
    ///
556
    /// tokio::spawn(async move {
557
    ///     notify2.notified().await;
558
    ///     println!("received notification");
559
    /// });
560
    ///
561
    /// println!("sending notification");
562
    /// notify.notify_one();
563
    /// # }
564
    /// ```
565
0
    pub fn notified(&self) -> Notified<'_> {
566
        // we load the number of times notify_waiters
567
        // was called and store that in the future.
568
0
        let state = self.state.load(SeqCst);
569
0
        Notified {
570
0
            notify: self,
571
0
            state: State::Init,
572
0
            notify_waiters_calls: get_num_notify_waiters_calls(state),
573
0
            waiter: Waiter::new(),
574
0
        }
575
0
    }
576
577
    /// Wait for a notification with an owned `Future`.
578
    ///
579
    /// Unlike [`Self::notified`] which returns a future tied to the `Notify`'s
580
    /// lifetime, `notified_owned` creates a self-contained future that owns its
581
    /// notification state, making it safe to move between threads.
582
    ///
583
    /// See [`Self::notified`] for more details.
584
    ///
585
    /// # Cancel safety
586
    ///
587
    /// This method uses a queue to fairly distribute notifications in the order
588
    /// they were requested. Cancelling a call to `notified_owned` makes you lose your
589
    /// place in the queue.
590
    ///
591
    /// # Examples
592
    ///
593
    /// ```
594
    /// use std::sync::Arc;
595
    /// use tokio::sync::Notify;
596
    ///
597
    /// # #[tokio::main(flavor = "current_thread")]
598
    /// # async fn main() {
599
    /// let notify = Arc::new(Notify::new());
600
    ///
601
    /// for _ in 0..10 {
602
    ///     let notified = notify.clone().notified_owned();
603
    ///     tokio::spawn(async move {
604
    ///         notified.await;
605
    ///         println!("received notification");
606
    ///     });
607
    /// }
608
    ///
609
    /// println!("sending notification");
610
    /// notify.notify_waiters();
611
    /// # }
612
    /// ```
613
0
    pub fn notified_owned(self: Arc<Self>) -> OwnedNotified {
614
        // we load the number of times notify_waiters
615
        // was called and store that in the future.
616
0
        let state = self.state.load(SeqCst);
617
0
        OwnedNotified {
618
0
            notify: self,
619
0
            state: State::Init,
620
0
            notify_waiters_calls: get_num_notify_waiters_calls(state),
621
0
            waiter: Waiter::new(),
622
0
        }
623
0
    }
624
    /// Notifies the first waiting task.
625
    ///
626
    /// If a task is currently waiting, that task is notified. Otherwise, a
627
    /// permit is stored in this `Notify` value and the **next** call to
628
    /// [`notified().await`] will complete immediately consuming the permit made
629
    /// available by this call to `notify_one()`.
630
    ///
631
    /// At most one permit may be stored by `Notify`. Many sequential calls to
632
    /// `notify_one` will result in a single permit being stored. The next call to
633
    /// `notified().await` will complete immediately, but the one after that
634
    /// will wait.
635
    ///
636
    /// [`notified().await`]: Notify::notified()
637
    ///
638
    /// # Examples
639
    ///
640
    /// ```
641
    /// use tokio::sync::Notify;
642
    /// use std::sync::Arc;
643
    ///
644
    /// # #[tokio::main(flavor = "current_thread")]
645
    /// # async fn main() {
646
    /// let notify = Arc::new(Notify::new());
647
    /// let notify2 = notify.clone();
648
    ///
649
    /// tokio::spawn(async move {
650
    ///     notify2.notified().await;
651
    ///     println!("received notification");
652
    /// });
653
    ///
654
    /// println!("sending notification");
655
    /// notify.notify_one();
656
    /// # }
657
    /// ```
658
    // Alias for old name in 0.x
659
    #[cfg_attr(docsrs, doc(alias = "notify"))]
660
0
    pub fn notify_one(&self) {
661
0
        self.notify_with_strategy(NotifyOneStrategy::Fifo);
662
0
    }
663
664
    /// Notifies the last waiting task.
665
    ///
666
    /// This function behaves similar to `notify_one`. The only difference is that it wakes
667
    /// the most recently added waiter instead of the oldest waiter.
668
    ///
669
    /// Check the [`notify_one()`] documentation for more info and
670
    /// examples.
671
    ///
672
    /// [`notify_one()`]: Notify::notify_one
673
0
    pub fn notify_last(&self) {
674
0
        self.notify_with_strategy(NotifyOneStrategy::Lifo);
675
0
    }
676
677
0
    fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
678
        // Load the current state
679
0
        let mut curr = self.state.load(SeqCst);
680
681
        // If the state is `EMPTY`, transition to `NOTIFIED` and return.
682
0
        while let EMPTY | NOTIFIED = get_state(curr) {
683
            // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
684
            // happens-before synchronization must happen between this atomic
685
            // operation and a task calling `notified().await`.
686
0
            let new = set_state(curr, NOTIFIED);
687
0
            let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
688
689
0
            match res {
690
                // No waiters, no further work to do
691
0
                Ok(_) => return,
692
0
                Err(actual) => {
693
0
                    curr = actual;
694
0
                }
695
            }
696
        }
697
698
        // There are waiters, the lock must be acquired to notify.
699
0
        let mut waiters = self.waiters.lock();
700
701
        // The state must be reloaded while the lock is held. The state may only
702
        // transition out of WAITING while the lock is held.
703
0
        curr = self.state.load(SeqCst);
704
705
0
        if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
706
0
            drop(waiters);
707
0
            waker.wake();
708
0
        }
709
0
    }
710
711
    /// Notifies all waiting tasks.
712
    ///
713
    /// If a task is currently waiting, that task is notified. Unlike with
714
    /// `notify_one()`, no permit is stored to be used by the next call to
715
    /// `notified().await`. The purpose of this method is to notify all
716
    /// already registered waiters. Registering for notification is done by
717
    /// acquiring an instance of the `Notified` future via calling `notified()`.
718
    ///
719
    /// # Examples
720
    ///
721
    /// ```
722
    /// use tokio::sync::Notify;
723
    /// use std::sync::Arc;
724
    ///
725
    /// # #[tokio::main(flavor = "current_thread")]
726
    /// # async fn main() {
727
    /// let notify = Arc::new(Notify::new());
728
    /// let notify2 = notify.clone();
729
    ///
730
    /// let notified1 = notify.notified();
731
    /// let notified2 = notify.notified();
732
    ///
733
    /// let handle = tokio::spawn(async move {
734
    ///     println!("sending notifications");
735
    ///     notify2.notify_waiters();
736
    /// });
737
    ///
738
    /// notified1.await;
739
    /// notified2.await;
740
    /// println!("received notifications");
741
    /// # }
742
    /// ```
743
128
    pub fn notify_waiters(&self) {
744
128
        self.lock_waiter_list().notify_waiters();
745
128
    }
746
747
128
    fn inner_notify_waiters<'a>(
748
128
        &'a self,
749
128
        curr: usize,
750
128
        mut waiters: crate::loom::sync::MutexGuard<'a, LinkedList<Waiter, Waiter>>,
751
128
    ) {
752
128
        if matches!(get_state(curr), EMPTY | NOTIFIED) {
753
            // There are no waiting tasks. All we need to do is increment the
754
            // number of times this method was called.
755
128
            atomic_inc_num_notify_waiters_calls(&self.state);
756
128
            return;
757
0
        }
758
759
        // Increment the number of times this method was called
760
        // and transition to empty.
761
0
        let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
762
0
        self.state.store(new_state, SeqCst);
763
764
        // It is critical for `GuardedLinkedList` safety that the guard node is
765
        // pinned in memory and is not dropped until the guarded list is dropped.
766
0
        let guard = Waiter::new();
767
0
        pin!(guard);
768
769
        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
770
        // underneath to allow every waiter to safely remove itself from it.
771
        //
772
        // * This list will be still guarded by the `waiters` lock.
773
        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
774
        // * This wrapper will empty the list on drop. It is critical for safety
775
        //   that we will not leave any list entry with a pointer to the local
776
        //   guard node after this function returns / panics.
777
0
        let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
778
779
0
        let mut wakers = WakeList::new();
780
        'outer: loop {
781
0
            while wakers.can_push() {
782
0
                match list.pop_back_locked(&mut waiters) {
783
0
                    Some(waiter) => {
784
                        // Safety: we never make mutable references to waiters.
785
0
                        let waiter = unsafe { waiter.as_ref() };
786
787
                        // Safety: we hold the lock, so we can access the waker.
788
0
                        if let Some(waker) =
789
0
                            unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
790
0
                        {
791
0
                            wakers.push(waker);
792
0
                        }
793
794
                        // This waiter is unlinked and will not be shared ever again, release it.
795
0
                        waiter.notification.store_release(Notification::All);
796
                    }
797
                    None => {
798
0
                        break 'outer;
799
                    }
800
                }
801
            }
802
803
            // Release the lock before notifying.
804
0
            drop(waiters);
805
806
            // One of the wakers may panic, but the remaining waiters will still
807
            // be unlinked from the list in `NotifyWaitersList` destructor.
808
0
            wakers.wake_all();
809
810
            // Acquire the lock again.
811
0
            waiters = self.waiters.lock();
812
        }
813
814
        // Release the lock before notifying
815
0
        drop(waiters);
816
817
0
        wakers.wake_all();
818
128
    }
819
820
128
    pub(crate) fn lock_waiter_list(&self) -> NotifyGuard<'_> {
821
128
        let guarded_waiters = self.waiters.lock();
822
823
        // The state must be loaded while the lock is held. The state may only
824
        // transition out of WAITING while the lock is held.
825
128
        let current_state = self.state.load(SeqCst);
826
827
128
        NotifyGuard {
828
128
            guarded_notify: self,
829
128
            guarded_waiters,
830
128
            current_state,
831
128
        }
832
128
    }
833
}
834
835
impl Default for Notify {
836
1.02k
    fn default() -> Notify {
837
1.02k
        Notify::new()
838
1.02k
    }
839
}
840
841
impl UnwindSafe for Notify {}
842
impl RefUnwindSafe for Notify {}
843
844
0
fn notify_locked(
845
0
    waiters: &mut WaitList,
846
0
    state: &AtomicUsize,
847
0
    curr: usize,
848
0
    strategy: NotifyOneStrategy,
849
0
) -> Option<Waker> {
850
0
    match get_state(curr) {
851
        EMPTY | NOTIFIED => {
852
0
            let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
853
854
0
            match res {
855
0
                Ok(_) => None,
856
0
                Err(actual) => {
857
0
                    let actual_state = get_state(actual);
858
0
                    assert!(actual_state == EMPTY || actual_state == NOTIFIED);
859
0
                    state.store(set_state(actual, NOTIFIED), SeqCst);
860
0
                    None
861
                }
862
            }
863
        }
864
        WAITING => {
865
            // At this point, it is guaranteed that the state will not
866
            // concurrently change as holding the lock is required to
867
            // transition **out** of `WAITING`.
868
            //
869
            // Get a pending waiter using one of the available dequeue strategies.
870
0
            let waiter = match strategy {
871
0
                NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
872
0
                NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
873
            };
874
875
            // Safety: we never make mutable references to waiters.
876
0
            let waiter = unsafe { waiter.as_ref() };
877
878
            // Safety: we hold the lock, so we can access the waker.
879
0
            let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
880
881
            // This waiter is unlinked and will not be shared ever again, release it.
882
0
            waiter
883
0
                .notification
884
0
                .store_release(Notification::One(strategy));
885
886
0
            if waiters.is_empty() {
887
0
                // As this the **final** waiter in the list, the state
888
0
                // must be transitioned to `EMPTY`. As transitioning
889
0
                // **from** `WAITING` requires the lock to be held, a
890
0
                // `store` is sufficient.
891
0
                state.store(set_state(curr, EMPTY), SeqCst);
892
0
            }
893
0
            waker
894
        }
895
0
        _ => unreachable!(),
896
    }
897
0
}
898
899
// ===== impl Notified =====
900
901
impl Notified<'_> {
902
    /// Adds this future to the list of futures that are ready to receive
903
    /// wakeups from calls to [`notify_one`].
904
    ///
905
    /// Polling the future also adds it to the list, so this method should only
906
    /// be used if you want to add the future to the list before the first call
907
    /// to `poll`. (In fact, this method is equivalent to calling `poll` except
908
    /// that no `Waker` is registered.)
909
    ///
910
    /// This has no effect on notifications sent using [`notify_waiters`], which
911
    /// are received as long as they happen after the creation of the `Notified`
912
    /// regardless of whether `enable` or `poll` has been called.
913
    ///
914
    /// This method returns true if the `Notified` is ready. This happens in the
915
    /// following situations:
916
    ///
917
    ///  1. The `notify_waiters` method was called between the creation of the
918
    ///     `Notified` and the call to this method.
919
    ///  2. This is the first call to `enable` or `poll` on this future, and the
920
    ///     `Notify` was holding a permit from a previous call to `notify_one`.
921
    ///     The call consumes the permit in that case.
922
    ///  3. The future has previously been enabled or polled, and it has since
923
    ///     then been marked ready by either consuming a permit from the
924
    ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
925
    ///     removed it from the list of futures ready to receive wakeups.
926
    ///
927
    /// If this method returns true, any future calls to poll on the same future
928
    /// will immediately return `Poll::Ready`.
929
    ///
930
    /// # Examples
931
    ///
932
    /// Unbound multi-producer multi-consumer (mpmc) channel.
933
    ///
934
    /// The call to `enable` is important because otherwise if you have two
935
    /// calls to `recv` and two calls to `send` in parallel, the following could
936
    /// happen:
937
    ///
938
    ///  1. Both calls to `try_recv` return `None`.
939
    ///  2. Both new elements are added to the vector.
940
    ///  3. The `notify_one` method is called twice, adding only a single
941
    ///     permit to the `Notify`.
942
    ///  4. Both calls to `recv` reach the `Notified` future. One of them
943
    ///     consumes the permit, and the other sleeps forever.
944
    ///
945
    /// By adding the `Notified` futures to the list by calling `enable` before
946
    /// `try_recv`, the `notify_one` calls in step three would remove the
947
    /// futures from the list and mark them notified instead of adding a permit
948
    /// to the `Notify`. This ensures that both futures are woken.
949
    ///
950
    /// ```
951
    /// use tokio::sync::Notify;
952
    ///
953
    /// use std::collections::VecDeque;
954
    /// use std::sync::Mutex;
955
    ///
956
    /// struct Channel<T> {
957
    ///     messages: Mutex<VecDeque<T>>,
958
    ///     notify_on_sent: Notify,
959
    /// }
960
    ///
961
    /// impl<T> Channel<T> {
962
    ///     pub fn send(&self, msg: T) {
963
    ///         let mut locked_queue = self.messages.lock().unwrap();
964
    ///         locked_queue.push_back(msg);
965
    ///         drop(locked_queue);
966
    ///
967
    ///         // Send a notification to one of the calls currently
968
    ///         // waiting in a call to `recv`.
969
    ///         self.notify_on_sent.notify_one();
970
    ///     }
971
    ///
972
    ///     pub fn try_recv(&self) -> Option<T> {
973
    ///         let mut locked_queue = self.messages.lock().unwrap();
974
    ///         locked_queue.pop_front()
975
    ///     }
976
    ///
977
    ///     pub async fn recv(&self) -> T {
978
    ///         let future = self.notify_on_sent.notified();
979
    ///         tokio::pin!(future);
980
    ///
981
    ///         loop {
982
    ///             // Make sure that no wakeup is lost if we get
983
    ///             // `None` from `try_recv`.
984
    ///             future.as_mut().enable();
985
    ///
986
    ///             if let Some(msg) = self.try_recv() {
987
    ///                 return msg;
988
    ///             }
989
    ///
990
    ///             // Wait for a call to `notify_one`.
991
    ///             //
992
    ///             // This uses `.as_mut()` to avoid consuming the future,
993
    ///             // which lets us call `Pin::set` below.
994
    ///             future.as_mut().await;
995
    ///
996
    ///             // Reset the future in case another call to
997
    ///             // `try_recv` got the message before us.
998
    ///             future.set(self.notify_on_sent.notified());
999
    ///         }
1000
    ///     }
1001
    /// }
1002
    /// ```
1003
    ///
1004
    /// [`notify_one`]: Notify::notify_one()
1005
    /// [`notify_waiters`]: Notify::notify_waiters()
1006
0
    pub fn enable(self: Pin<&mut Self>) -> bool {
1007
0
        self.poll_notified(None).is_ready()
1008
0
    }
1009
1010
0
    fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
1011
        unsafe {
1012
            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
1013
1014
0
            is_unpin::<&Notify>();
1015
0
            is_unpin::<State>();
1016
0
            is_unpin::<usize>();
1017
1018
0
            let me = self.get_unchecked_mut();
1019
0
            NotifiedProject {
1020
0
                notify: me.notify,
1021
0
                state: &mut me.state,
1022
0
                notify_waiters_calls: &me.notify_waiters_calls,
1023
0
                waiter: &me.waiter,
1024
0
            }
1025
        }
1026
0
    }
1027
1028
0
    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1029
0
        self.project().poll_notified(waker)
1030
0
    }
1031
}
1032
1033
impl Future for Notified<'_> {
1034
    type Output = ();
1035
1036
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1037
0
        self.poll_notified(Some(cx.waker()))
1038
0
    }
1039
}
1040
1041
impl Drop for Notified<'_> {
1042
0
    fn drop(&mut self) {
1043
        // Safety: The type only transitions to a "Waiting" state when pinned.
1044
0
        unsafe { Pin::new_unchecked(self) }
1045
0
            .project()
1046
0
            .drop_notified();
1047
0
    }
1048
}
1049
1050
// ===== impl OwnedNotified =====
1051
1052
impl OwnedNotified {
1053
    /// Adds this future to the list of futures that are ready to receive
1054
    /// wakeups from calls to [`notify_one`].
1055
    ///
1056
    /// See [`Notified::enable`] for more details.
1057
    ///
1058
    /// [`notify_one`]: Notify::notify_one()
1059
0
    pub fn enable(self: Pin<&mut Self>) -> bool {
1060
0
        self.poll_notified(None).is_ready()
1061
0
    }
1062
1063
    /// A custom `project` implementation is used in place of `pin-project-lite`
1064
    /// as a custom drop implementation is needed.
1065
0
    fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
1066
        unsafe {
1067
            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
1068
1069
0
            is_unpin::<&Notify>();
1070
0
            is_unpin::<State>();
1071
0
            is_unpin::<usize>();
1072
1073
0
            let me = self.get_unchecked_mut();
1074
0
            NotifiedProject {
1075
0
                notify: &me.notify,
1076
0
                state: &mut me.state,
1077
0
                notify_waiters_calls: &me.notify_waiters_calls,
1078
0
                waiter: &me.waiter,
1079
0
            }
1080
        }
1081
0
    }
1082
1083
0
    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1084
0
        self.project().poll_notified(waker)
1085
0
    }
1086
}
1087
1088
impl Future for OwnedNotified {
1089
    type Output = ();
1090
1091
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1092
0
        self.poll_notified(Some(cx.waker()))
1093
0
    }
1094
}
1095
1096
impl Drop for OwnedNotified {
1097
0
    fn drop(&mut self) {
1098
        // Safety: The type only transitions to a "Waiting" state when pinned.
1099
0
        unsafe { Pin::new_unchecked(self) }
1100
0
            .project()
1101
0
            .drop_notified();
1102
0
    }
1103
}
1104
1105
// ===== impl NotifiedProject =====
1106
1107
impl NotifiedProject<'_> {
1108
0
    fn poll_notified(self, waker: Option<&Waker>) -> Poll<()> {
1109
        let NotifiedProject {
1110
0
            notify,
1111
0
            state,
1112
0
            notify_waiters_calls,
1113
0
            waiter,
1114
0
        } = self;
1115
1116
        'outer_loop: loop {
1117
0
            match *state {
1118
                State::Init => {
1119
0
                    let curr = notify.state.load(SeqCst);
1120
1121
                    // Optimistically try acquiring a pending notification
1122
0
                    let res = notify.state.compare_exchange(
1123
0
                        set_state(curr, NOTIFIED),
1124
0
                        set_state(curr, EMPTY),
1125
0
                        SeqCst,
1126
0
                        SeqCst,
1127
0
                    );
1128
1129
0
                    if res.is_ok() {
1130
                        // Acquired the notification
1131
0
                        *state = State::Done;
1132
0
                        continue 'outer_loop;
1133
0
                    }
1134
1135
                    // Clone the waker before locking, a waker clone can be
1136
                    // triggering arbitrary code.
1137
0
                    let waker = waker.cloned();
1138
1139
                    // Acquire the lock and attempt to transition to the waiting
1140
                    // state.
1141
0
                    let mut waiters = notify.waiters.lock();
1142
1143
                    // Reload the state with the lock held
1144
0
                    let mut curr = notify.state.load(SeqCst);
1145
1146
                    // if notify_waiters has been called after the future
1147
                    // was created, then we are done
1148
0
                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1149
0
                        *state = State::Done;
1150
0
                        continue 'outer_loop;
1151
0
                    }
1152
1153
                    // Transition the state to WAITING.
1154
                    loop {
1155
0
                        match get_state(curr) {
1156
                            EMPTY => {
1157
                                // Transition to WAITING
1158
0
                                let res = notify.state.compare_exchange(
1159
0
                                    set_state(curr, EMPTY),
1160
0
                                    set_state(curr, WAITING),
1161
0
                                    SeqCst,
1162
0
                                    SeqCst,
1163
0
                                );
1164
1165
0
                                if let Err(actual) = res {
1166
0
                                    assert_eq!(get_state(actual), NOTIFIED);
1167
0
                                    curr = actual;
1168
                                } else {
1169
0
                                    break;
1170
                                }
1171
                            }
1172
0
                            WAITING => break,
1173
                            NOTIFIED => {
1174
                                // Try consuming the notification
1175
0
                                let res = notify.state.compare_exchange(
1176
0
                                    set_state(curr, NOTIFIED),
1177
0
                                    set_state(curr, EMPTY),
1178
0
                                    SeqCst,
1179
0
                                    SeqCst,
1180
0
                                );
1181
1182
0
                                match res {
1183
                                    Ok(_) => {
1184
                                        // Acquired the notification
1185
0
                                        *state = State::Done;
1186
0
                                        continue 'outer_loop;
1187
                                    }
1188
0
                                    Err(actual) => {
1189
0
                                        assert_eq!(get_state(actual), EMPTY);
1190
0
                                        curr = actual;
1191
                                    }
1192
                                }
1193
                            }
1194
0
                            _ => unreachable!(),
1195
                        }
1196
                    }
1197
1198
0
                    let mut old_waker = None;
1199
0
                    if waker.is_some() {
1200
                        // Safety: called while locked.
1201
                        //
1202
                        // The use of `old_waiter` here is not necessary, as the field is always
1203
                        // None when we reach this line.
1204
                        unsafe {
1205
0
                            old_waker =
1206
0
                                waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1207
                        }
1208
0
                    }
1209
1210
                    // Insert the waiter into the linked list
1211
0
                    waiters.push_front(NonNull::from(waiter));
1212
1213
0
                    *state = State::Waiting;
1214
1215
0
                    drop(waiters);
1216
0
                    drop(old_waker);
1217
1218
0
                    return Poll::Pending;
1219
                }
1220
                State::Waiting => {
1221
                    #[cfg(feature = "taskdump")]
1222
                    if let Some(waker) = waker {
1223
                        let mut ctx = Context::from_waker(waker);
1224
                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1225
                    }
1226
1227
0
                    if waiter.notification.load(Acquire).is_some() {
1228
                        // Safety: waiter is already unlinked and will not be shared again,
1229
                        // so we have an exclusive access to `waker`.
1230
0
                        drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1231
1232
0
                        waiter.notification.clear();
1233
0
                        *state = State::Done;
1234
0
                        return Poll::Ready(());
1235
0
                    }
1236
1237
                    // Our waiter was not notified, implying it is still stored in a waiter
1238
                    // list (guarded by `notify.waiters`). In order to access the waker
1239
                    // fields, we must acquire the lock.
1240
1241
0
                    let mut old_waker = None;
1242
0
                    let mut waiters = notify.waiters.lock();
1243
1244
                    // We hold the lock and notifications are set only with the lock held,
1245
                    // so this can be relaxed, because the happens-before relationship is
1246
                    // established through the mutex.
1247
0
                    if waiter.notification.load(Relaxed).is_some() {
1248
                        // Safety: waiter is already unlinked and will not be shared again,
1249
                        // so we have an exclusive access to `waker`.
1250
0
                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1251
1252
0
                        waiter.notification.clear();
1253
1254
                        // Drop the old waker after releasing the lock.
1255
0
                        drop(waiters);
1256
0
                        drop(old_waker);
1257
1258
0
                        *state = State::Done;
1259
0
                        return Poll::Ready(());
1260
0
                    }
1261
1262
                    // Load the state with the lock held.
1263
0
                    let curr = notify.state.load(SeqCst);
1264
1265
0
                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1266
                        // Before we add a waiter to the list we check if these numbers are
1267
                        // different while holding the lock. If these numbers are different now,
1268
                        // it means that there is a call to `notify_waiters` in progress and this
1269
                        // waiter must be contained by a guarded list used in `notify_waiters`.
1270
                        // We can treat the waiter as notified and remove it from the list, as
1271
                        // it would have been notified in the `notify_waiters` call anyways.
1272
1273
                        // Safety: we hold the lock, so we can modify the waker.
1274
0
                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1275
1276
                        // Safety: we hold the lock, so we have an exclusive access to the list.
1277
                        // The list is used in `notify_waiters`, so it must be guarded.
1278
0
                        unsafe { waiters.remove(NonNull::from(waiter)) };
1279
1280
0
                        *state = State::Done;
1281
                    } else {
1282
                        // Safety: we hold the lock, so we can modify the waker.
1283
                        unsafe {
1284
0
                            waiter.waker.with_mut(|v| {
1285
0
                                if let Some(waker) = waker {
1286
0
                                    let should_update = match &*v {
1287
0
                                        Some(current_waker) => !current_waker.will_wake(waker),
1288
0
                                        None => true,
1289
                                    };
1290
0
                                    if should_update {
1291
0
                                        old_waker = (*v).replace(waker.clone());
1292
0
                                    }
1293
0
                                }
1294
0
                            });
1295
                        }
1296
1297
                        // Drop the old waker after releasing the lock.
1298
0
                        drop(waiters);
1299
0
                        drop(old_waker);
1300
1301
0
                        return Poll::Pending;
1302
                    }
1303
1304
                    // Explicit drop of the lock to indicate the scope that the
1305
                    // lock is held. Because holding the lock is required to
1306
                    // ensure safe access to fields not held within the lock, it
1307
                    // is helpful to visualize the scope of the critical
1308
                    // section.
1309
0
                    drop(waiters);
1310
1311
                    // Drop the old waker after releasing the lock.
1312
0
                    drop(old_waker);
1313
                }
1314
                State::Done => {
1315
                    #[cfg(feature = "taskdump")]
1316
                    if let Some(waker) = waker {
1317
                        let mut ctx = Context::from_waker(waker);
1318
                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1319
                    }
1320
0
                    return Poll::Ready(());
1321
                }
1322
            }
1323
        }
1324
0
    }
1325
1326
0
    fn drop_notified(self) {
1327
        let NotifiedProject {
1328
0
            notify,
1329
0
            state,
1330
0
            waiter,
1331
            ..
1332
0
        } = self;
1333
1334
        // This is where we ensure safety. The `Notified` value is being
1335
        // dropped, which means we must ensure that the waiter entry is no
1336
        // longer stored in the linked list.
1337
0
        if matches!(*state, State::Waiting) {
1338
0
            let mut waiters = notify.waiters.lock();
1339
0
            let mut notify_state = notify.state.load(SeqCst);
1340
1341
            // We hold the lock, so this field is not concurrently accessed by
1342
            // `notify_*` functions and we can use the relaxed ordering.
1343
0
            let notification = waiter.notification.load(Relaxed);
1344
1345
            // remove the entry from the list (if not already removed)
1346
            //
1347
            // Safety: we hold the lock, so we have an exclusive access to every list the
1348
            // waiter may be contained in. If the node is not contained in the `waiters`
1349
            // list, then it is contained by a guarded list used by `notify_waiters`.
1350
0
            unsafe { waiters.remove(NonNull::from(waiter)) };
1351
1352
0
            if waiters.is_empty() && get_state(notify_state) == WAITING {
1353
0
                notify_state = set_state(notify_state, EMPTY);
1354
0
                notify.state.store(notify_state, SeqCst);
1355
0
            }
1356
1357
            // See if the node was notified but not received. In this case, if
1358
            // the notification was triggered via `notify_one`, it must be sent
1359
            // to the next waiter.
1360
0
            if let Some(Notification::One(strategy)) = notification {
1361
0
                if let Some(waker) =
1362
0
                    notify_locked(&mut waiters, &notify.state, notify_state, strategy)
1363
0
                {
1364
0
                    drop(waiters);
1365
0
                    waker.wake();
1366
0
                }
1367
0
            }
1368
0
        }
1369
0
    }
1370
}
1371
1372
/// # Safety
1373
///
1374
/// `Waiter` is forced to be !Unpin.
1375
unsafe impl linked_list::Link for Waiter {
1376
    type Handle = NonNull<Waiter>;
1377
    type Target = Waiter;
1378
1379
0
    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1380
0
        *handle
1381
0
    }
1382
1383
0
    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1384
0
        ptr
1385
0
    }
1386
1387
0
    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1388
0
        unsafe { Waiter::addr_of_pointers(target) }
1389
0
    }
1390
}
1391
1392
0
fn is_unpin<T: Unpin>() {}
Unexecuted instantiation: tokio::sync::notify::is_unpin::<tokio::sync::notify::State>
Unexecuted instantiation: tokio::sync::notify::is_unpin::<&tokio::sync::notify::Notify>
Unexecuted instantiation: tokio::sync::notify::is_unpin::<usize>
1393
1394
/// A guard that provides exclusive access to a `Notify`'s internal
1395
/// waiters list.
1396
///
1397
/// While this guard is held, the `Notify` instance's waiter list is locked.
1398
pub(crate) struct NotifyGuard<'a> {
1399
    guarded_notify: &'a Notify,
1400
    guarded_waiters: crate::loom::sync::MutexGuard<'a, WaitList>,
1401
    current_state: usize,
1402
}
1403
1404
impl NotifyGuard<'_> {
1405
128
    pub(crate) fn notify_waiters(self) {
1406
128
        self.guarded_notify
1407
128
            .inner_notify_waiters(self.current_state, self.guarded_waiters);
1408
128
    }
1409
}