Coverage Report

Created: 2025-07-11 06:53

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.46.1/src/sync/watch.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3
//! A multi-producer, multi-consumer channel that only retains the *last* sent
4
//! value.
5
//!
6
//! This channel is useful for watching for changes to a value from multiple
7
//! points in the code base, for example, changes to configuration values.
8
//!
9
//! # Usage
10
//!
11
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12
//! and consumer halves of the channel. The channel is created with an initial
13
//! value.
14
//!
15
//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16
//!
17
//! To access the **current** value stored in the channel and mark it as *seen*
18
//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19
//!
20
//! To access the current value **without** marking it as *seen*, use
21
//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22
//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23
//!
24
//! For more information on when to use these methods, see
25
//! [here](#borrow_and_update-versus-borrow).
26
//!
27
//! ## Change notifications
28
//!
29
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31
//!
32
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33
//!   `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34
//! * If the current value is *unseen* when calling [`changed`], then
35
//!   [`changed`] will return immediately. If the current value is *seen*, then
36
//!   it will sleep until either a new message is sent via the [`Sender`] half,
37
//!   or the [`Sender`] is dropped.
38
//! * On completion, the [`changed`] method marks the new value as *seen*.
39
//! * At creation, the initial value is considered *seen*. In other words,
40
//!   [`Receiver::changed()`] will not return until a subsequent value is sent.
41
//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42
//!   The current value at the time the [`Receiver`] is created is considered
43
//!   *seen*.
44
//!
45
//! ## `borrow_and_update` versus `borrow`
46
//!
47
//! If the receiver intends to await notifications from [`changed`] in a loop,
48
//! [`Receiver::borrow_and_update()`] should be preferred over
49
//! [`Receiver::borrow()`].  This avoids a potential race where a new value is
50
//! sent between [`changed`] being ready and the value being read. (If
51
//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52
//!
53
//! If the receiver is only interested in the current value, and does not intend
54
//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55
//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56
//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57
//! self`.
58
//!
59
//! # Examples
60
//!
61
//! The following example prints `hello! world! `.
62
//!
63
//! ```
64
//! use tokio::sync::watch;
65
//! use tokio::time::{Duration, sleep};
66
//!
67
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68
//! let (tx, mut rx) = watch::channel("hello");
69
//!
70
//! tokio::spawn(async move {
71
//!     // Use the equivalent of a "do-while" loop so the initial value is
72
//!     // processed before awaiting the `changed()` future.
73
//!     loop {
74
//!         println!("{}! ", *rx.borrow_and_update());
75
//!         if rx.changed().await.is_err() {
76
//!             break;
77
//!         }
78
//!     }
79
//! });
80
//!
81
//! sleep(Duration::from_millis(100)).await;
82
//! tx.send("world")?;
83
//! # Ok(())
84
//! # }
85
//! ```
86
//!
87
//! # Closing
88
//!
89
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90
//! when all [`Receiver`] handles have been dropped. This indicates that there
91
//! is no further interest in the values being produced and work can be stopped.
92
//!
93
//! The value in the channel will not be dropped until the sender and all
94
//! receivers have been dropped.
95
//!
96
//! # Thread safety
97
//!
98
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100
//! handles may be moved to separate threads and also used concurrently.
101
//!
102
//! [`Sender`]: crate::sync::watch::Sender
103
//! [`Receiver`]: crate::sync::watch::Receiver
104
//! [`changed`]: crate::sync::watch::Receiver::changed
105
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107
//! [`Receiver::borrow_and_update()`]:
108
//!     crate::sync::watch::Receiver::borrow_and_update
109
//! [`channel`]: crate::sync::watch::channel
110
//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
112
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113
114
use crate::sync::notify::Notify;
115
use crate::task::coop::cooperative;
116
117
use crate::loom::sync::atomic::AtomicUsize;
118
use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
119
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
120
use std::fmt;
121
use std::mem;
122
use std::ops;
123
use std::panic;
124
125
/// Receives values from the associated [`Sender`](struct@Sender).
126
///
127
/// Instances are created by the [`channel`](fn@channel) function.
128
///
129
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
130
/// wrapper.
131
///
132
/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
133
#[derive(Debug)]
134
pub struct Receiver<T> {
135
    /// Pointer to the shared state
136
    shared: Arc<Shared<T>>,
137
138
    /// Last observed version
139
    version: Version,
140
}
141
142
/// Sends values to the associated [`Receiver`](struct@Receiver).
143
///
144
/// Instances are created by the [`channel`](fn@channel) function.
145
#[derive(Debug)]
146
pub struct Sender<T> {
147
    shared: Arc<Shared<T>>,
148
}
149
150
impl<T> Clone for Sender<T> {
151
0
    fn clone(&self) -> Self {
152
0
        self.shared.ref_count_tx.fetch_add(1, Relaxed);
153
0
154
0
        Self {
155
0
            shared: self.shared.clone(),
156
0
        }
157
0
    }
158
}
159
160
impl<T: Default> Default for Sender<T> {
161
0
    fn default() -> Self {
162
0
        Self::new(T::default())
163
0
    }
164
}
165
166
/// Returns a reference to the inner value.
167
///
168
/// Outstanding borrows hold a read lock on the inner value. This means that
169
/// long-lived borrows could cause the producer half to block. It is recommended
170
/// to keep the borrow as short-lived as possible. Additionally, if you are
171
/// running in an environment that allows `!Send` futures, you must ensure that
172
/// the returned `Ref` type is never held alive across an `.await` point,
173
/// otherwise, it can lead to a deadlock.
174
///
175
/// The priority policy of the lock is dependent on the underlying lock
176
/// implementation, and this type does not guarantee that any particular policy
177
/// will be used. In particular, a producer which is waiting to acquire the lock
178
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
179
///
180
/// <details><summary>Potential deadlock example</summary>
181
///
182
/// ```text
183
/// // Task 1 (on thread A)    |  // Task 2 (on thread B)
184
/// let _ref1 = rx.borrow();   |
185
///                            |  // will block
186
///                            |  let _ = tx.send(());
187
/// // may deadlock            |
188
/// let _ref2 = rx.borrow();   |
189
/// ```
190
/// </details>
191
#[derive(Debug)]
192
pub struct Ref<'a, T> {
193
    inner: RwLockReadGuard<'a, T>,
194
    has_changed: bool,
195
}
196
197
impl<'a, T> Ref<'a, T> {
198
    /// Indicates if the borrowed value is considered as _changed_ since the last
199
    /// time it has been marked as seen.
200
    ///
201
    /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
202
    ///
203
    /// When borrowed from the [`Sender`] this function will always return `false`.
204
    ///
205
    /// # Examples
206
    ///
207
    /// ```
208
    /// use tokio::sync::watch;
209
    ///
210
    /// #[tokio::main]
211
    /// async fn main() {
212
    ///     let (tx, mut rx) = watch::channel("hello");
213
    ///
214
    ///     tx.send("goodbye").unwrap();
215
    ///     // The sender does never consider the value as changed.
216
    ///     assert!(!tx.borrow().has_changed());
217
    ///
218
    ///     // Drop the sender immediately, just for testing purposes.
219
    ///     drop(tx);
220
    ///
221
    ///     // Even if the sender has already been dropped...
222
    ///     assert!(rx.has_changed().is_err());
223
    ///     // ...the modified value is still readable and detected as changed.
224
    ///     assert_eq!(*rx.borrow(), "goodbye");
225
    ///     assert!(rx.borrow().has_changed());
226
    ///
227
    ///     // Read the changed value and mark it as seen.
228
    ///     {
229
    ///         let received = rx.borrow_and_update();
230
    ///         assert_eq!(*received, "goodbye");
231
    ///         assert!(received.has_changed());
232
    ///         // Release the read lock when leaving this scope.
233
    ///     }
234
    ///
235
    ///     // Now the value has already been marked as seen and could
236
    ///     // never be modified again (after the sender has been dropped).
237
    ///     assert!(!rx.borrow().has_changed());
238
    /// }
239
    /// ```
240
0
    pub fn has_changed(&self) -> bool {
241
0
        self.has_changed
242
0
    }
243
}
244
245
struct Shared<T> {
246
    /// The most recent value.
247
    value: RwLock<T>,
248
249
    /// The current version.
250
    ///
251
    /// The lowest bit represents a "closed" state. The rest of the bits
252
    /// represent the current version.
253
    state: AtomicState,
254
255
    /// Tracks the number of `Receiver` instances.
256
    ref_count_rx: AtomicUsize,
257
258
    /// Tracks the number of `Sender` instances.
259
    ref_count_tx: AtomicUsize,
260
261
    /// Notifies waiting receivers that the value changed.
262
    notify_rx: big_notify::BigNotify,
263
264
    /// Notifies any task listening for `Receiver` dropped events.
265
    notify_tx: Notify,
266
}
267
268
impl<T: fmt::Debug> fmt::Debug for Shared<T> {
269
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270
0
        let state = self.state.load();
271
0
        f.debug_struct("Shared")
272
0
            .field("value", &self.value)
273
0
            .field("version", &state.version())
274
0
            .field("is_closed", &state.is_closed())
275
0
            .field("ref_count_rx", &self.ref_count_rx)
276
0
            .finish()
277
0
    }
278
}
279
280
pub mod error {
281
    //! Watch error types.
282
283
    use std::error::Error;
284
    use std::fmt;
285
286
    /// Error produced when sending a value fails.
287
    #[derive(PartialEq, Eq, Clone, Copy)]
288
    pub struct SendError<T>(pub T);
289
290
    // ===== impl SendError =====
291
292
    impl<T> fmt::Debug for SendError<T> {
293
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294
0
            f.debug_struct("SendError").finish_non_exhaustive()
295
0
        }
296
    }
297
298
    impl<T> fmt::Display for SendError<T> {
299
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
300
0
            write!(fmt, "channel closed")
301
0
        }
302
    }
303
304
    impl<T> Error for SendError<T> {}
305
306
    /// Error produced when receiving a change notification.
307
    #[derive(Debug, Clone)]
308
    pub struct RecvError(pub(super) ());
309
310
    // ===== impl RecvError =====
311
312
    impl fmt::Display for RecvError {
313
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
314
0
            write!(fmt, "channel closed")
315
0
        }
316
    }
317
318
    impl Error for RecvError {}
319
}
320
321
mod big_notify {
322
    use super::Notify;
323
    use crate::sync::notify::Notified;
324
325
    // To avoid contention on the lock inside the `Notify`, we store multiple
326
    // copies of it. Then, we use either circular access or randomness to spread
327
    // out threads over different `Notify` objects.
328
    //
329
    // Some simple benchmarks show that randomness performs slightly better than
330
    // circular access (probably due to contention on `next`), so we prefer to
331
    // use randomness when Tokio is compiled with a random number generator.
332
    //
333
    // When the random number generator is not available, we fall back to
334
    // circular access.
335
336
    pub(super) struct BigNotify {
337
        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
338
        next: std::sync::atomic::AtomicUsize,
339
        inner: [Notify; 8],
340
    }
341
342
    impl BigNotify {
343
130
        pub(super) fn new() -> Self {
344
130
            Self {
345
130
                #[cfg(not(all(
346
130
                    not(loom),
347
130
                    feature = "sync",
348
130
                    any(feature = "rt", feature = "macros")
349
130
                )))]
350
130
                next: std::sync::atomic::AtomicUsize::new(0),
351
130
                inner: Default::default(),
352
130
            }
353
130
        }
354
355
0
        pub(super) fn notify_waiters(&self) {
356
0
            for notify in &self.inner {
357
0
                notify.notify_waiters();
358
0
            }
359
0
        }
360
361
        /// This function implements the case where randomness is not available.
362
        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
363
        pub(super) fn notified(&self) -> Notified<'_> {
364
            let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
365
            self.inner[i].notified()
366
        }
367
368
        /// This function implements the case where randomness is available.
369
        #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
370
0
        pub(super) fn notified(&self) -> Notified<'_> {
371
0
            let i = crate::runtime::context::thread_rng_n(8) as usize;
372
0
            self.inner[i].notified()
373
0
        }
374
    }
375
}
376
377
use self::state::{AtomicState, Version};
378
mod state {
379
    use crate::loom::sync::atomic::AtomicUsize;
380
    use crate::loom::sync::atomic::Ordering;
381
382
    const CLOSED_BIT: usize = 1;
383
384
    // Using 2 as the step size preserves the `CLOSED_BIT`.
385
    const STEP_SIZE: usize = 2;
386
387
    /// The version part of the state. The lowest bit is always zero.
388
    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
389
    pub(super) struct Version(usize);
390
391
    /// Snapshot of the state. The first bit is used as the CLOSED bit.
392
    /// The remaining bits are used as the version.
393
    ///
394
    /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
395
    /// receivers does not set it.
396
    #[derive(Copy, Clone, Debug)]
397
    pub(super) struct StateSnapshot(usize);
398
399
    /// The state stored in an atomic integer.
400
    ///
401
    /// The `Sender` uses `Release` ordering for storing a new state
402
    /// and the `Receiver`s use `Acquire` ordering for loading the
403
    /// current state. This ensures that written values are seen by
404
    /// the `Receiver`s for a proper handover.
405
    #[derive(Debug)]
406
    pub(super) struct AtomicState(AtomicUsize);
407
408
    impl Version {
409
        /// Decrements the version.
410
0
        pub(super) fn decrement(&mut self) {
411
0
            // Using a wrapping decrement here is required to ensure that the
412
0
            // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
413
0
            // which wraps on overflow.
414
0
            self.0 = self.0.wrapping_sub(STEP_SIZE);
415
0
        }
416
417
        pub(super) const INITIAL: Self = Version(0);
418
    }
419
420
    impl StateSnapshot {
421
        /// Extract the version from the state.
422
0
        pub(super) fn version(self) -> Version {
423
0
            Version(self.0 & !CLOSED_BIT)
424
0
        }
425
426
        /// Is the closed bit set?
427
0
        pub(super) fn is_closed(self) -> bool {
428
0
            (self.0 & CLOSED_BIT) == CLOSED_BIT
429
0
        }
430
    }
431
432
    impl AtomicState {
433
        /// Create a new `AtomicState` that is not closed and which has the
434
        /// version set to `Version::INITIAL`.
435
130
        pub(super) fn new() -> Self {
436
130
            AtomicState(AtomicUsize::new(Version::INITIAL.0))
437
130
        }
438
439
        /// Load the current value of the state.
440
        ///
441
        /// Only used by the receiver and for debugging purposes.
442
        ///
443
        /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
444
        /// of the shared value with the sender side (single writer). The state is always
445
        /// updated after modifying and before releasing the (exclusive) lock on the
446
        /// shared value.
447
0
        pub(super) fn load(&self) -> StateSnapshot {
448
0
            StateSnapshot(self.0.load(Ordering::Acquire))
449
0
        }
450
451
        /// Increment the version counter.
452
0
        pub(super) fn increment_version_while_locked(&self) {
453
0
            // Use `Release` ordering to ensure that the shared value
454
0
            // has been written before updating the version. The shared
455
0
            // value is still protected by an exclusive lock during this
456
0
            // method.
457
0
            self.0.fetch_add(STEP_SIZE, Ordering::Release);
458
0
        }
459
460
        /// Set the closed bit in the state.
461
0
        pub(super) fn set_closed(&self) {
462
0
            self.0.fetch_or(CLOSED_BIT, Ordering::Release);
463
0
        }
464
    }
465
}
466
467
/// Creates a new watch channel, returning the "send" and "receive" handles.
468
///
469
/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
470
/// Only the last value sent is made available to the [`Receiver`] half. All
471
/// intermediate values are dropped.
472
///
473
/// # Examples
474
///
475
/// The following example prints `hello! world! `.
476
///
477
/// ```
478
/// use tokio::sync::watch;
479
/// use tokio::time::{Duration, sleep};
480
///
481
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
482
/// let (tx, mut rx) = watch::channel("hello");
483
///
484
/// tokio::spawn(async move {
485
///     // Use the equivalent of a "do-while" loop so the initial value is
486
///     // processed before awaiting the `changed()` future.
487
///     loop {
488
///         println!("{}! ", *rx.borrow_and_update());
489
///         if rx.changed().await.is_err() {
490
///             break;
491
///         }
492
///     }
493
/// });
494
///
495
/// sleep(Duration::from_millis(100)).await;
496
/// tx.send("world")?;
497
/// # Ok(())
498
/// # }
499
/// ```
500
///
501
/// [`Sender`]: struct@Sender
502
/// [`Receiver`]: struct@Receiver
503
130
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
504
130
    let shared = Arc::new(Shared {
505
130
        value: RwLock::new(init),
506
130
        state: AtomicState::new(),
507
130
        ref_count_rx: AtomicUsize::new(1),
508
130
        ref_count_tx: AtomicUsize::new(1),
509
130
        notify_rx: big_notify::BigNotify::new(),
510
130
        notify_tx: Notify::new(),
511
130
    });
512
130
513
130
    let tx = Sender {
514
130
        shared: shared.clone(),
515
130
    };
516
130
517
130
    let rx = Receiver {
518
130
        shared,
519
130
        version: Version::INITIAL,
520
130
    };
521
130
522
130
    (tx, rx)
523
130
}
Unexecuted instantiation: tokio::sync::watch::channel::<usize>
tokio::sync::watch::channel::<()>
Line
Count
Source
503
130
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
504
130
    let shared = Arc::new(Shared {
505
130
        value: RwLock::new(init),
506
130
        state: AtomicState::new(),
507
130
        ref_count_rx: AtomicUsize::new(1),
508
130
        ref_count_tx: AtomicUsize::new(1),
509
130
        notify_rx: big_notify::BigNotify::new(),
510
130
        notify_tx: Notify::new(),
511
130
    });
512
130
513
130
    let tx = Sender {
514
130
        shared: shared.clone(),
515
130
    };
516
130
517
130
    let rx = Receiver {
518
130
        shared,
519
130
        version: Version::INITIAL,
520
130
    };
521
130
522
130
    (tx, rx)
523
130
}
524
525
impl<T> Receiver<T> {
526
0
    fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
527
0
        // No synchronization necessary as this is only used as a counter and
528
0
        // not memory access.
529
0
        shared.ref_count_rx.fetch_add(1, Relaxed);
530
0
531
0
        Self { shared, version }
532
0
    }
533
534
    /// Returns a reference to the most recently sent value.
535
    ///
536
    /// This method does not mark the returned value as seen, so future calls to
537
    /// [`changed`] may return immediately even if you have already seen the
538
    /// value with a call to `borrow`.
539
    ///
540
    /// Outstanding borrows hold a read lock on the inner value. This means that
541
    /// long-lived borrows could cause the producer half to block. It is recommended
542
    /// to keep the borrow as short-lived as possible. Additionally, if you are
543
    /// running in an environment that allows `!Send` futures, you must ensure that
544
    /// the returned `Ref` type is never held alive across an `.await` point,
545
    /// otherwise, it can lead to a deadlock.
546
    ///
547
    /// The priority policy of the lock is dependent on the underlying lock
548
    /// implementation, and this type does not guarantee that any particular policy
549
    /// will be used. In particular, a producer which is waiting to acquire the lock
550
    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
551
    ///
552
    /// <details><summary>Potential deadlock example</summary>
553
    ///
554
    /// ```text
555
    /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
556
    /// let _ref1 = rx.borrow();   |
557
    ///                            |  // will block
558
    ///                            |  let _ = tx.send(());
559
    /// // may deadlock            |
560
    /// let _ref2 = rx.borrow();   |
561
    /// ```
562
    /// </details>
563
    ///
564
    /// For more information on when to use this method versus
565
    /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
566
    ///
567
    /// [`changed`]: Receiver::changed
568
    /// [`borrow_and_update`]: Receiver::borrow_and_update
569
    ///
570
    /// # Examples
571
    ///
572
    /// ```
573
    /// use tokio::sync::watch;
574
    ///
575
    /// let (_, rx) = watch::channel("hello");
576
    /// assert_eq!(*rx.borrow(), "hello");
577
    /// ```
578
0
    pub fn borrow(&self) -> Ref<'_, T> {
579
0
        let inner = self.shared.value.read();
580
0
581
0
        // After obtaining a read-lock no concurrent writes could occur
582
0
        // and the loaded version matches that of the borrowed reference.
583
0
        let new_version = self.shared.state.load().version();
584
0
        let has_changed = self.version != new_version;
585
0
586
0
        Ref { inner, has_changed }
587
0
    }
588
589
    /// Returns a reference to the most recently sent value and marks that value
590
    /// as seen.
591
    ///
592
    /// This method marks the current value as seen. Subsequent calls to [`changed`]
593
    /// will not return immediately until the [`Sender`] has modified the shared
594
    /// value again.
595
    ///
596
    /// Outstanding borrows hold a read lock on the inner value. This means that
597
    /// long-lived borrows could cause the producer half to block. It is recommended
598
    /// to keep the borrow as short-lived as possible. Additionally, if you are
599
    /// running in an environment that allows `!Send` futures, you must ensure that
600
    /// the returned `Ref` type is never held alive across an `.await` point,
601
    /// otherwise, it can lead to a deadlock.
602
    ///
603
    /// The priority policy of the lock is dependent on the underlying lock
604
    /// implementation, and this type does not guarantee that any particular policy
605
    /// will be used. In particular, a producer which is waiting to acquire the lock
606
    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
607
    ///
608
    /// <details><summary>Potential deadlock example</summary>
609
    ///
610
    /// ```text
611
    /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
612
    /// let _ref1 = rx1.borrow_and_update();   |
613
    ///                                        |  // will block
614
    ///                                        |  let _ = tx.send(());
615
    /// // may deadlock                        |
616
    /// let _ref2 = rx2.borrow_and_update();   |
617
    /// ```
618
    /// </details>
619
    ///
620
    /// For more information on when to use this method versus [`borrow`], see
621
    /// [here](self#borrow_and_update-versus-borrow).
622
    ///
623
    /// [`changed`]: Receiver::changed
624
    /// [`borrow`]: Receiver::borrow
625
0
    pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
626
0
        let inner = self.shared.value.read();
627
0
628
0
        // After obtaining a read-lock no concurrent writes could occur
629
0
        // and the loaded version matches that of the borrowed reference.
630
0
        let new_version = self.shared.state.load().version();
631
0
        let has_changed = self.version != new_version;
632
0
633
0
        // Mark the shared value as seen by updating the version
634
0
        self.version = new_version;
635
0
636
0
        Ref { inner, has_changed }
637
0
    }
638
639
    /// Checks if this channel contains a message that this receiver has not yet
640
    /// seen. The new value is not marked as seen.
641
    ///
642
    /// Although this method is called `has_changed`, it does not check new
643
    /// messages for equality, so this call will return true even if the new
644
    /// message is equal to the old message.
645
    ///
646
    /// Returns an error if the channel has been closed.
647
    /// # Examples
648
    ///
649
    /// ```
650
    /// use tokio::sync::watch;
651
    ///
652
    /// #[tokio::main]
653
    /// async fn main() {
654
    ///     let (tx, mut rx) = watch::channel("hello");
655
    ///
656
    ///     tx.send("goodbye").unwrap();
657
    ///
658
    ///     assert!(rx.has_changed().unwrap());
659
    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
660
    ///
661
    ///     // The value has been marked as seen
662
    ///     assert!(!rx.has_changed().unwrap());
663
    ///
664
    ///     drop(tx);
665
    ///     // The `tx` handle has been dropped
666
    ///     assert!(rx.has_changed().is_err());
667
    /// }
668
    /// ```
669
0
    pub fn has_changed(&self) -> Result<bool, error::RecvError> {
670
0
        // Load the version from the state
671
0
        let state = self.shared.state.load();
672
0
        if state.is_closed() {
673
            // The sender has dropped.
674
0
            return Err(error::RecvError(()));
675
0
        }
676
0
        let new_version = state.version();
677
0
678
0
        Ok(self.version != new_version)
679
0
    }
680
681
    /// Marks the state as changed.
682
    ///
683
    /// After invoking this method [`has_changed()`](Self::has_changed)
684
    /// returns `true` and [`changed()`](Self::changed) returns
685
    /// immediately, regardless of whether a new value has been sent.
686
    ///
687
    /// This is useful for triggering an initial change notification after
688
    /// subscribing to synchronize new receivers.
689
0
    pub fn mark_changed(&mut self) {
690
0
        self.version.decrement();
691
0
    }
692
693
    /// Marks the state as unchanged.
694
    ///
695
    /// The current value will be considered seen by the receiver.
696
    ///
697
    /// This is useful if you are not interested in the current value
698
    /// visible in the receiver.
699
0
    pub fn mark_unchanged(&mut self) {
700
0
        let current_version = self.shared.state.load().version();
701
0
        self.version = current_version;
702
0
    }
703
704
    /// Waits for a change notification, then marks the newest value as seen.
705
    ///
706
    /// If the newest value in the channel has not yet been marked seen when
707
    /// this method is called, the method marks that value seen and returns
708
    /// immediately. If the newest value has already been marked seen, then the
709
    /// method sleeps until a new message is sent by the [`Sender`] connected to
710
    /// this `Receiver`, or until the [`Sender`] is dropped.
711
    ///
712
    /// This method returns an error if and only if the [`Sender`] is dropped.
713
    ///
714
    /// For more information, see
715
    /// [*Change notifications*](self#change-notifications) in the module-level documentation.
716
    ///
717
    /// # Cancel safety
718
    ///
719
    /// This method is cancel safe. If you use it as the event in a
720
    /// [`tokio::select!`](crate::select) statement and some other branch
721
    /// completes first, then it is guaranteed that no values have been marked
722
    /// seen by this call to `changed`.
723
    ///
724
    /// [`Sender`]: struct@Sender
725
    ///
726
    /// # Examples
727
    ///
728
    /// ```
729
    /// use tokio::sync::watch;
730
    ///
731
    /// #[tokio::main]
732
    /// async fn main() {
733
    ///     let (tx, mut rx) = watch::channel("hello");
734
    ///
735
    ///     tokio::spawn(async move {
736
    ///         tx.send("goodbye").unwrap();
737
    ///     });
738
    ///
739
    ///     assert!(rx.changed().await.is_ok());
740
    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
741
    ///
742
    ///     // The `tx` handle has been dropped
743
    ///     assert!(rx.changed().await.is_err());
744
    /// }
745
    /// ```
746
0
    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
747
0
        cooperative(changed_impl(&self.shared, &mut self.version)).await
748
0
    }
749
750
    /// Waits for a value that satisfies the provided condition.
751
    ///
752
    /// This method will call the provided closure whenever something is sent on
753
    /// the channel. Once the closure returns `true`, this method will return a
754
    /// reference to the value that was passed to the closure.
755
    ///
756
    /// Before `wait_for` starts waiting for changes, it will call the closure
757
    /// on the current value. If the closure returns `true` when given the
758
    /// current value, then `wait_for` will immediately return a reference to
759
    /// the current value. This is the case even if the current value is already
760
    /// considered seen.
761
    ///
762
    /// The watch channel only keeps track of the most recent value, so if
763
    /// several messages are sent faster than `wait_for` is able to call the
764
    /// closure, then it may skip some updates. Whenever the closure is called,
765
    /// it will be called with the most recent value.
766
    ///
767
    /// When this function returns, the value that was passed to the closure
768
    /// when it returned `true` will be considered seen.
769
    ///
770
    /// If the channel is closed, then `wait_for` will return a [`RecvError`].
771
    /// Once this happens, no more messages can ever be sent on the channel.
772
    /// When an error is returned, it is guaranteed that the closure has been
773
    /// called on the last value, and that it returned `false` for that value.
774
    /// (If the closure returned `true`, then the last value would have been
775
    /// returned instead of the error.)
776
    ///
777
    /// Like the [`borrow`] method, the returned borrow holds a read lock on the
778
    /// inner value. This means that long-lived borrows could cause the producer
779
    /// half to block. It is recommended to keep the borrow as short-lived as
780
    /// possible. See the documentation of `borrow` for more information on
781
    /// this.
782
    ///
783
    /// [`borrow`]: Receiver::borrow
784
    /// [`RecvError`]: error::RecvError
785
    ///
786
    /// # Cancel safety
787
    ///
788
    /// This method is cancel safe. If you use it as the event in a
789
    /// [`tokio::select!`](crate::select) statement and some other branch
790
    /// completes first, then it is guaranteed that the last seen value `val`
791
    /// (if any) satisfies `f(val) == false`.
792
    ///
793
    /// # Panics
794
    ///
795
    /// If and only if the closure `f` panics. In that case, no resource owned
796
    /// or shared by this [`Receiver`] will be poisoned.
797
    ///
798
    /// # Examples
799
    ///
800
    /// ```
801
    /// use tokio::sync::watch;
802
    /// use tokio::time::{sleep, Duration};
803
    ///
804
    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
805
    /// async fn main() {
806
    ///     let (tx, mut rx) = watch::channel("hello");
807
    ///
808
    ///     tokio::spawn(async move {
809
    ///         sleep(Duration::from_secs(1)).await;
810
    ///         tx.send("goodbye").unwrap();
811
    ///     });
812
    ///
813
    ///     assert!(rx.wait_for(|val| *val == "goodbye").await.is_ok());
814
    ///     assert_eq!(*rx.borrow(), "goodbye");
815
    /// }
816
    /// ```
817
0
    pub async fn wait_for(
818
0
        &mut self,
819
0
        f: impl FnMut(&T) -> bool,
820
0
    ) -> Result<Ref<'_, T>, error::RecvError> {
821
0
        cooperative(self.wait_for_inner(f)).await
822
0
    }
823
824
0
    async fn wait_for_inner(
825
0
        &mut self,
826
0
        mut f: impl FnMut(&T) -> bool,
827
0
    ) -> Result<Ref<'_, T>, error::RecvError> {
828
0
        let mut closed = false;
829
        loop {
830
            {
831
0
                let inner = self.shared.value.read();
832
0
833
0
                let new_version = self.shared.state.load().version();
834
0
                let has_changed = self.version != new_version;
835
0
                self.version = new_version;
836
0
837
0
                if !closed || has_changed {
838
0
                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
839
0
                    match result {
840
                        Ok(true) => {
841
0
                            return Ok(Ref { inner, has_changed });
842
                        }
843
0
                        Ok(false) => {
844
0
                            // Skip the value.
845
0
                        }
846
0
                        Err(panicked) => {
847
0
                            // Drop the read-lock to avoid poisoning it.
848
0
                            drop(inner);
849
0
                            // Forward the panic to the caller.
850
0
                            panic::resume_unwind(panicked);
851
                            // Unreachable
852
                        }
853
                    };
854
0
                }
855
            }
856
857
0
            if closed {
858
0
                return Err(error::RecvError(()));
859
0
            }
860
0
861
0
            // Wait for the value to change.
862
0
            closed = changed_impl(&self.shared, &mut self.version).await.is_err();
863
        }
864
0
    }
865
866
    /// Returns `true` if receivers belong to the same channel.
867
    ///
868
    /// # Examples
869
    ///
870
    /// ```
871
    /// let (tx, rx) = tokio::sync::watch::channel(true);
872
    /// let rx2 = rx.clone();
873
    /// assert!(rx.same_channel(&rx2));
874
    ///
875
    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
876
    /// assert!(!rx3.same_channel(&rx2));
877
    /// ```
878
0
    pub fn same_channel(&self, other: &Self) -> bool {
879
0
        Arc::ptr_eq(&self.shared, &other.shared)
880
0
    }
881
882
    cfg_process_driver! {
883
0
        pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
884
0
            maybe_changed(&self.shared, &mut self.version)
885
0
        }
886
    }
887
}
888
889
0
fn maybe_changed<T>(
890
0
    shared: &Shared<T>,
891
0
    version: &mut Version,
892
0
) -> Option<Result<(), error::RecvError>> {
893
0
    // Load the version from the state
894
0
    let state = shared.state.load();
895
0
    let new_version = state.version();
896
0
897
0
    if *version != new_version {
898
        // Observe the new version and return
899
0
        *version = new_version;
900
0
        return Some(Ok(()));
901
0
    }
902
0
903
0
    if state.is_closed() {
904
        // The sender has been dropped.
905
0
        return Some(Err(error::RecvError(())));
906
0
    }
907
0
908
0
    None
909
0
}
910
911
0
async fn changed_impl<T>(
912
0
    shared: &Shared<T>,
913
0
    version: &mut Version,
914
0
) -> Result<(), error::RecvError> {
915
0
    crate::trace::async_trace_leaf().await;
916
917
    loop {
918
        // In order to avoid a race condition, we first request a notification,
919
        // **then** check the current value's version. If a new version exists,
920
        // the notification request is dropped.
921
0
        let notified = shared.notify_rx.notified();
922
923
0
        if let Some(ret) = maybe_changed(shared, version) {
924
0
            return ret;
925
0
        }
926
0
927
0
        notified.await;
928
        // loop around again in case the wake-up was spurious
929
    }
930
0
}
931
932
impl<T> Clone for Receiver<T> {
933
0
    fn clone(&self) -> Self {
934
0
        let version = self.version;
935
0
        let shared = self.shared.clone();
936
0
937
0
        Self::from_shared(version, shared)
938
0
    }
939
}
940
941
impl<T> Drop for Receiver<T> {
942
130
    fn drop(&mut self) {
943
130
        // No synchronization necessary as this is only used as a counter and
944
130
        // not memory access.
945
130
        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
946
130
            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
947
130
            self.shared.notify_tx.notify_waiters();
948
130
        }
949
130
    }
Unexecuted instantiation: <tokio::sync::watch::Receiver<usize> as core::ops::drop::Drop>::drop
<tokio::sync::watch::Receiver<()> as core::ops::drop::Drop>::drop
Line
Count
Source
942
130
    fn drop(&mut self) {
943
130
        // No synchronization necessary as this is only used as a counter and
944
130
        // not memory access.
945
130
        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
946
130
            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
947
130
            self.shared.notify_tx.notify_waiters();
948
130
        }
949
130
    }
950
}
951
952
impl<T> Sender<T> {
953
    /// Creates the sending-half of the [`watch`] channel.
954
    ///
955
    /// See documentation of [`watch::channel`] for errors when calling this function.
956
    /// Beware that attempting to send a value when there are no receivers will
957
    /// return an error.
958
    ///
959
    /// [`watch`]: crate::sync::watch
960
    /// [`watch::channel`]: crate::sync::watch
961
    ///
962
    /// # Examples
963
    /// ```
964
    /// let sender = tokio::sync::watch::Sender::new(0u8);
965
    /// assert!(sender.send(3).is_err());
966
    /// let _rec = sender.subscribe();
967
    /// assert!(sender.send(4).is_ok());
968
    /// ```
969
0
    pub fn new(init: T) -> Self {
970
0
        let (tx, _) = channel(init);
971
0
        tx
972
0
    }
973
974
    /// Sends a new value via the channel, notifying all receivers.
975
    ///
976
    /// This method fails if the channel is closed, which is the case when
977
    /// every receiver has been dropped. It is possible to reopen the channel
978
    /// using the [`subscribe`] method. However, when `send` fails, the value
979
    /// isn't made available for future receivers (but returned with the
980
    /// [`SendError`]).
981
    ///
982
    /// To always make a new value available for future receivers, even if no
983
    /// receiver currently exists, one of the other send methods
984
    /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
985
    /// used instead.
986
    ///
987
    /// [`subscribe`]: Sender::subscribe
988
    /// [`SendError`]: error::SendError
989
    /// [`send_if_modified`]: Sender::send_if_modified
990
    /// [`send_modify`]: Sender::send_modify
991
    /// [`send_replace`]: Sender::send_replace
992
0
    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
993
0
        // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
994
0
        if 0 == self.receiver_count() {
995
0
            return Err(error::SendError(value));
996
0
        }
997
0
998
0
        self.send_replace(value);
999
0
        Ok(())
1000
0
    }
1001
1002
    /// Modifies the watched value **unconditionally** in-place,
1003
    /// notifying all receivers.
1004
    ///
1005
    /// This can be useful for modifying the watched value, without
1006
    /// having to allocate a new instance. Additionally, this
1007
    /// method permits sending values even when there are no receivers.
1008
    ///
1009
    /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1010
    /// if the value is only modified conditionally during the mutable borrow
1011
    /// to prevent unneeded change notifications for unmodified values.
1012
    ///
1013
    /// # Panics
1014
    ///
1015
    /// This function panics when the invocation of the `modify` closure panics.
1016
    /// No receivers are notified when panicking. All changes of the watched
1017
    /// value applied by the closure before panicking will be visible in
1018
    /// subsequent calls to `borrow`.
1019
    ///
1020
    /// # Examples
1021
    ///
1022
    /// ```
1023
    /// use tokio::sync::watch;
1024
    ///
1025
    /// struct State {
1026
    ///     counter: usize,
1027
    /// }
1028
    /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1029
    /// state_tx.send_modify(|state| state.counter += 1);
1030
    /// assert_eq!(state_rx.borrow().counter, 1);
1031
    /// ```
1032
0
    pub fn send_modify<F>(&self, modify: F)
1033
0
    where
1034
0
        F: FnOnce(&mut T),
1035
0
    {
1036
0
        self.send_if_modified(|value| {
1037
0
            modify(value);
1038
0
            true
1039
0
        });
1040
0
    }
1041
1042
    /// Modifies the watched value **conditionally** in-place,
1043
    /// notifying all receivers only if modified.
1044
    ///
1045
    /// This can be useful for modifying the watched value, without
1046
    /// having to allocate a new instance. Additionally, this
1047
    /// method permits sending values even when there are no receivers.
1048
    ///
1049
    /// The `modify` closure must return `true` if the value has actually
1050
    /// been modified during the mutable borrow. It should only return `false`
1051
    /// if the value is guaranteed to be unmodified despite the mutable
1052
    /// borrow.
1053
    ///
1054
    /// Receivers are only notified if the closure returned `true`. If the
1055
    /// closure has modified the value but returned `false` this results
1056
    /// in a *silent modification*, i.e. the modified value will be visible
1057
    /// in subsequent calls to `borrow`, but receivers will not receive
1058
    /// a change notification.
1059
    ///
1060
    /// Returns the result of the closure, i.e. `true` if the value has
1061
    /// been modified and `false` otherwise.
1062
    ///
1063
    /// # Panics
1064
    ///
1065
    /// This function panics when the invocation of the `modify` closure panics.
1066
    /// No receivers are notified when panicking. All changes of the watched
1067
    /// value applied by the closure before panicking will be visible in
1068
    /// subsequent calls to `borrow`.
1069
    ///
1070
    /// # Examples
1071
    ///
1072
    /// ```
1073
    /// use tokio::sync::watch;
1074
    ///
1075
    /// struct State {
1076
    ///     counter: usize,
1077
    /// }
1078
    /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1079
    /// let inc_counter_if_odd = |state: &mut State| {
1080
    ///     if state.counter % 2 == 1 {
1081
    ///         state.counter += 1;
1082
    ///         return true;
1083
    ///     }
1084
    ///     false
1085
    /// };
1086
    ///
1087
    /// assert_eq!(state_rx.borrow().counter, 1);
1088
    ///
1089
    /// assert!(!state_rx.has_changed().unwrap());
1090
    /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1091
    /// assert!(state_rx.has_changed().unwrap());
1092
    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1093
    ///
1094
    /// assert!(!state_rx.has_changed().unwrap());
1095
    /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1096
    /// assert!(!state_rx.has_changed().unwrap());
1097
    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1098
    /// ```
1099
0
    pub fn send_if_modified<F>(&self, modify: F) -> bool
1100
0
    where
1101
0
        F: FnOnce(&mut T) -> bool,
1102
0
    {
1103
0
        {
1104
0
            // Acquire the write lock and update the value.
1105
0
            let mut lock = self.shared.value.write();
1106
0
1107
0
            // Update the value and catch possible panic inside func.
1108
0
            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1109
0
            match result {
1110
0
                Ok(modified) => {
1111
0
                    if !modified {
1112
                        // Abort, i.e. don't notify receivers if unmodified
1113
0
                        return false;
1114
0
                    }
1115
                    // Continue if modified
1116
                }
1117
0
                Err(panicked) => {
1118
0
                    // Drop the lock to avoid poisoning it.
1119
0
                    drop(lock);
1120
0
                    // Forward the panic to the caller.
1121
0
                    panic::resume_unwind(panicked);
1122
                    // Unreachable
1123
                }
1124
            };
1125
1126
0
            self.shared.state.increment_version_while_locked();
1127
0
1128
0
            // Release the write lock.
1129
0
            //
1130
0
            // Incrementing the version counter while holding the lock ensures
1131
0
            // that receivers are able to figure out the version number of the
1132
0
            // value they are currently looking at.
1133
0
            drop(lock);
1134
0
        }
1135
0
1136
0
        self.shared.notify_rx.notify_waiters();
1137
0
1138
0
        true
1139
0
    }
1140
1141
    /// Sends a new value via the channel, notifying all receivers and returning
1142
    /// the previous value in the channel.
1143
    ///
1144
    /// This can be useful for reusing the buffers inside a watched value.
1145
    /// Additionally, this method permits sending values even when there are no
1146
    /// receivers.
1147
    ///
1148
    /// # Examples
1149
    ///
1150
    /// ```
1151
    /// use tokio::sync::watch;
1152
    ///
1153
    /// let (tx, _rx) = watch::channel(1);
1154
    /// assert_eq!(tx.send_replace(2), 1);
1155
    /// assert_eq!(tx.send_replace(3), 2);
1156
    /// ```
1157
0
    pub fn send_replace(&self, mut value: T) -> T {
1158
0
        // swap old watched value with the new one
1159
0
        self.send_modify(|old| mem::swap(old, &mut value));
1160
0
1161
0
        value
1162
0
    }
1163
1164
    /// Returns a reference to the most recently sent value
1165
    ///
1166
    /// Outstanding borrows hold a read lock on the inner value. This means that
1167
    /// long-lived borrows could cause the producer half to block. It is recommended
1168
    /// to keep the borrow as short-lived as possible. Additionally, if you are
1169
    /// running in an environment that allows `!Send` futures, you must ensure that
1170
    /// the returned `Ref` type is never held alive across an `.await` point,
1171
    /// otherwise, it can lead to a deadlock.
1172
    ///
1173
    /// # Examples
1174
    ///
1175
    /// ```
1176
    /// use tokio::sync::watch;
1177
    ///
1178
    /// let (tx, _) = watch::channel("hello");
1179
    /// assert_eq!(*tx.borrow(), "hello");
1180
    /// ```
1181
0
    pub fn borrow(&self) -> Ref<'_, T> {
1182
0
        let inner = self.shared.value.read();
1183
0
1184
0
        // The sender/producer always sees the current version
1185
0
        let has_changed = false;
1186
0
1187
0
        Ref { inner, has_changed }
1188
0
    }
1189
1190
    /// Checks if the channel has been closed. This happens when all receivers
1191
    /// have dropped.
1192
    ///
1193
    /// # Examples
1194
    ///
1195
    /// ```
1196
    /// let (tx, rx) = tokio::sync::watch::channel(());
1197
    /// assert!(!tx.is_closed());
1198
    ///
1199
    /// drop(rx);
1200
    /// assert!(tx.is_closed());
1201
    /// ```
1202
0
    pub fn is_closed(&self) -> bool {
1203
0
        self.receiver_count() == 0
1204
0
    }
1205
1206
    /// Completes when all receivers have dropped.
1207
    ///
1208
    /// This allows the producer to get notified when interest in the produced
1209
    /// values is canceled and immediately stop doing work. Once a channel is
1210
    /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1211
    /// get a new receiver.
1212
    ///
1213
    /// If the channel becomes closed for a brief amount of time (e.g., the last
1214
    /// receiver is dropped and then `subscribe` is called), then this call to
1215
    /// `closed` might return, but it is also possible that it does not "notice"
1216
    /// that the channel was closed for a brief amount of time.
1217
    ///
1218
    /// # Cancel safety
1219
    ///
1220
    /// This method is cancel safe.
1221
    ///
1222
    /// # Examples
1223
    ///
1224
    /// ```
1225
    /// use tokio::sync::watch;
1226
    ///
1227
    /// #[tokio::main]
1228
    /// async fn main() {
1229
    ///     let (tx, rx) = watch::channel("hello");
1230
    ///
1231
    ///     tokio::spawn(async move {
1232
    ///         // use `rx`
1233
    ///         drop(rx);
1234
    ///     });
1235
    ///
1236
    ///     // Waits for `rx` to drop
1237
    ///     tx.closed().await;
1238
    ///     println!("the `rx` handles dropped")
1239
    /// }
1240
    /// ```
1241
0
    pub async fn closed(&self) {
1242
0
        cooperative(async {
1243
0
            crate::trace::async_trace_leaf().await;
1244
1245
0
            while self.receiver_count() > 0 {
1246
0
                let notified = self.shared.notify_tx.notified();
1247
0
1248
0
                if self.receiver_count() == 0 {
1249
0
                    return;
1250
0
                }
1251
0
1252
0
                notified.await;
1253
                // The channel could have been reopened in the meantime by calling
1254
                // `subscribe`, so we loop again.
1255
            }
1256
0
        })
1257
0
        .await;
1258
0
    }
1259
1260
    /// Creates a new [`Receiver`] connected to this `Sender`.
1261
    ///
1262
    /// All messages sent before this call to `subscribe` are initially marked
1263
    /// as seen by the new `Receiver`.
1264
    ///
1265
    /// This method can be called even if there are no other receivers. In this
1266
    /// case, the channel is reopened.
1267
    ///
1268
    /// # Examples
1269
    ///
1270
    /// The new channel will receive messages sent on this `Sender`.
1271
    ///
1272
    /// ```
1273
    /// use tokio::sync::watch;
1274
    ///
1275
    /// #[tokio::main]
1276
    /// async fn main() {
1277
    ///     let (tx, _rx) = watch::channel(0u64);
1278
    ///
1279
    ///     tx.send(5).unwrap();
1280
    ///
1281
    ///     let rx = tx.subscribe();
1282
    ///     assert_eq!(5, *rx.borrow());
1283
    ///
1284
    ///     tx.send(10).unwrap();
1285
    ///     assert_eq!(10, *rx.borrow());
1286
    /// }
1287
    /// ```
1288
    ///
1289
    /// The most recent message is considered seen by the channel, so this test
1290
    /// is guaranteed to pass.
1291
    ///
1292
    /// ```
1293
    /// use tokio::sync::watch;
1294
    /// use tokio::time::Duration;
1295
    ///
1296
    /// #[tokio::main]
1297
    /// async fn main() {
1298
    ///     let (tx, _rx) = watch::channel(0u64);
1299
    ///     tx.send(5).unwrap();
1300
    ///     let mut rx = tx.subscribe();
1301
    ///
1302
    ///     tokio::spawn(async move {
1303
    ///         // by spawning and sleeping, the message is sent after `main`
1304
    ///         // hits the call to `changed`.
1305
    ///         # if false {
1306
    ///         tokio::time::sleep(Duration::from_millis(10)).await;
1307
    ///         # }
1308
    ///         tx.send(100).unwrap();
1309
    ///     });
1310
    ///
1311
    ///     rx.changed().await.unwrap();
1312
    ///     assert_eq!(100, *rx.borrow());
1313
    /// }
1314
    /// ```
1315
0
    pub fn subscribe(&self) -> Receiver<T> {
1316
0
        let shared = self.shared.clone();
1317
0
        let version = shared.state.load().version();
1318
0
1319
0
        // The CLOSED bit in the state tracks only whether the sender is
1320
0
        // dropped, so we do not need to unset it if this reopens the channel.
1321
0
        Receiver::from_shared(version, shared)
1322
0
    }
1323
1324
    /// Returns the number of receivers that currently exist.
1325
    ///
1326
    /// # Examples
1327
    ///
1328
    /// ```
1329
    /// use tokio::sync::watch;
1330
    ///
1331
    /// #[tokio::main]
1332
    /// async fn main() {
1333
    ///     let (tx, rx1) = watch::channel("hello");
1334
    ///
1335
    ///     assert_eq!(1, tx.receiver_count());
1336
    ///
1337
    ///     let mut _rx2 = rx1.clone();
1338
    ///
1339
    ///     assert_eq!(2, tx.receiver_count());
1340
    /// }
1341
    /// ```
1342
0
    pub fn receiver_count(&self) -> usize {
1343
0
        self.shared.ref_count_rx.load(Relaxed)
1344
0
    }
1345
1346
    /// Returns the number of senders that currently exist.
1347
    ///
1348
    /// # Examples
1349
    ///
1350
    /// ```
1351
    /// use tokio::sync::watch;
1352
    ///
1353
    /// #[tokio::main]
1354
    /// async fn main() {
1355
    ///     let (tx1, rx) = watch::channel("hello");
1356
    ///
1357
    ///     assert_eq!(1, tx1.sender_count());
1358
    ///
1359
    ///     let tx2 = tx1.clone();
1360
    ///
1361
    ///     assert_eq!(2, tx1.sender_count());
1362
    ///     assert_eq!(2, tx2.sender_count());
1363
    /// }
1364
    /// ```
1365
0
    pub fn sender_count(&self) -> usize {
1366
0
        self.shared.ref_count_tx.load(Relaxed)
1367
0
    }
1368
1369
    /// Returns `true` if senders belong to the same channel.
1370
    ///
1371
    /// # Examples
1372
    ///
1373
    /// ```
1374
    /// let (tx, rx) = tokio::sync::watch::channel(true);
1375
    /// let tx2 = tx.clone();
1376
    /// assert!(tx.same_channel(&tx2));
1377
    ///
1378
    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1379
    /// assert!(!tx3.same_channel(&tx2));
1380
    /// ```
1381
0
    pub fn same_channel(&self, other: &Self) -> bool {
1382
0
        Arc::ptr_eq(&self.shared, &other.shared)
1383
0
    }
1384
}
1385
1386
impl<T> Drop for Sender<T> {
1387
0
    fn drop(&mut self) {
1388
0
        if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1389
0
            self.shared.state.set_closed();
1390
0
            self.shared.notify_rx.notify_waiters();
1391
0
        }
1392
0
    }
Unexecuted instantiation: <tokio::sync::watch::Sender<usize> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <tokio::sync::watch::Sender<()> as core::ops::drop::Drop>::drop
1393
}
1394
1395
// ===== impl Ref =====
1396
1397
impl<T> ops::Deref for Ref<'_, T> {
1398
    type Target = T;
1399
1400
0
    fn deref(&self) -> &T {
1401
0
        self.inner.deref()
1402
0
    }
1403
}
1404
1405
#[cfg(all(test, loom))]
1406
mod tests {
1407
    use futures::future::FutureExt;
1408
    use loom::thread;
1409
1410
    // test for https://github.com/tokio-rs/tokio/issues/3168
1411
    #[test]
1412
    fn watch_spurious_wakeup() {
1413
        loom::model(|| {
1414
            let (send, mut recv) = crate::sync::watch::channel(0i32);
1415
1416
            send.send(1).unwrap();
1417
1418
            let send_thread = thread::spawn(move || {
1419
                send.send(2).unwrap();
1420
                send
1421
            });
1422
1423
            recv.changed().now_or_never();
1424
1425
            let send = send_thread.join().unwrap();
1426
            let recv_thread = thread::spawn(move || {
1427
                recv.changed().now_or_never();
1428
                recv.changed().now_or_never();
1429
                recv
1430
            });
1431
1432
            send.send(3).unwrap();
1433
1434
            let mut recv = recv_thread.join().unwrap();
1435
            let send_thread = thread::spawn(move || {
1436
                send.send(2).unwrap();
1437
            });
1438
1439
            recv.changed().now_or_never();
1440
1441
            send_thread.join().unwrap();
1442
        });
1443
    }
1444
1445
    #[test]
1446
    fn watch_borrow() {
1447
        loom::model(|| {
1448
            let (send, mut recv) = crate::sync::watch::channel(0i32);
1449
1450
            assert!(send.borrow().eq(&0));
1451
            assert!(recv.borrow().eq(&0));
1452
1453
            send.send(1).unwrap();
1454
            assert!(send.borrow().eq(&1));
1455
1456
            let send_thread = thread::spawn(move || {
1457
                send.send(2).unwrap();
1458
                send
1459
            });
1460
1461
            recv.changed().now_or_never();
1462
1463
            let send = send_thread.join().unwrap();
1464
            let recv_thread = thread::spawn(move || {
1465
                recv.changed().now_or_never();
1466
                recv.changed().now_or_never();
1467
                recv
1468
            });
1469
1470
            send.send(3).unwrap();
1471
1472
            let recv = recv_thread.join().unwrap();
1473
            assert!(recv.borrow().eq(&3));
1474
            assert!(send.borrow().eq(&3));
1475
1476
            send.send(2).unwrap();
1477
1478
            thread::spawn(move || {
1479
                assert!(recv.borrow().eq(&2));
1480
            });
1481
            assert!(send.borrow().eq(&2));
1482
        });
1483
    }
1484
}