Coverage Report

Created: 2025-11-16 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/sync/watch.rs
Line
Count
Source
1
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3
//! A multi-producer, multi-consumer channel that only retains the *last* sent
4
//! value.
5
//!
6
//! This channel is useful for watching for changes to a value from multiple
7
//! points in the code base, for example, changes to configuration values.
8
//!
9
//! # Usage
10
//!
11
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12
//! and consumer halves of the channel. The channel is created with an initial
13
//! value.
14
//!
15
//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16
//!
17
//! To access the **current** value stored in the channel and mark it as *seen*
18
//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19
//!
20
//! To access the current value **without** marking it as *seen*, use
21
//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22
//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23
//!
24
//! For more information on when to use these methods, see
25
//! [here](#borrow_and_update-versus-borrow).
26
//!
27
//! ## Change notifications
28
//!
29
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31
//!
32
//! * [`Receiver::changed()`] returns:
33
//!     * `Ok(())` on receiving a new value.
34
//!     * `Err(`[`RecvError`](error::RecvError)`)` if the
35
//!       channel has been closed __AND__ the current value is *seen*.
36
//! * If the current value is *unseen* when calling [`changed`], then
37
//!   [`changed`] will return immediately. If the current value is *seen*, then
38
//!   it will sleep until either a new message is sent via the [`Sender`] half,
39
//!   or the [`Sender`] is dropped.
40
//! * On completion, the [`changed`] method marks the new value as *seen*.
41
//! * At creation, the initial value is considered *seen*. In other words,
42
//!   [`Receiver::changed()`] will not return until a subsequent value is sent.
43
//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
44
//!   The current value at the time the [`Receiver`] is created is considered
45
//!   *seen*.
46
//!
47
//! ## [`changed`] versus [`has_changed`]
48
//!
49
//! The [`Receiver`] half provides two methods for checking for changes
50
//! in the channel, [`has_changed`] and [`changed`].
51
//!
52
//! * [`has_changed`] is a *synchronous* method that checks whether the current
53
//!   value is seen or not and returns a boolean. This method does __not__ mark the
54
//!   value as seen.
55
//!
56
//! * [`changed`] is an *asynchronous* method that will return once an unseen
57
//!   value is in the channel. This method does mark the value as seen.
58
//!
59
//! Note there are two behavioral differences on when these two methods return
60
//! an error.
61
//!
62
//! - [`has_changed`] errors if and only if the channel is closed.
63
//! - [`changed`] errors if the channel has been closed __AND__
64
//!   the current value is seen.
65
//!
66
//! See the example below that shows how these methods have different fallibility.
67
//!
68
//! ## [`borrow_and_update`] versus [`borrow`]
69
//!
70
//! If the receiver intends to await notifications from [`changed`] in a loop,
71
//! [`Receiver::borrow_and_update()`] should be preferred over
72
//! [`Receiver::borrow()`].  This avoids a potential race where a new value is
73
//! sent between [`changed`] being ready and the value being read. (If
74
//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
75
//!
76
//! If the receiver is only interested in the current value, and does not intend
77
//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
78
//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
79
//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
80
//! self`.
81
//!
82
//! # Examples
83
//!
84
//! The following example prints `hello! world! `.
85
//!
86
//! ```
87
//! use tokio::sync::watch;
88
//! use tokio::time::{Duration, sleep};
89
//!
90
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
91
//! let (tx, mut rx) = watch::channel("hello");
92
//!
93
//! tokio::spawn(async move {
94
//!     // Use the equivalent of a "do-while" loop so the initial value is
95
//!     // processed before awaiting the `changed()` future.
96
//!     loop {
97
//!         println!("{}! ", *rx.borrow_and_update());
98
//!         if rx.changed().await.is_err() {
99
//!             break;
100
//!         }
101
//!     }
102
//! });
103
//!
104
//! sleep(Duration::from_millis(100)).await;
105
//! tx.send("world")?;
106
//! # Ok(())
107
//! # }
108
//! ```
109
//!
110
//! Difference on fallibility of [`changed`] versus [`has_changed`].
111
//! ```
112
//! use tokio::sync::watch;
113
//!
114
//! #[tokio::main(flavor = "current_thread")]
115
//! # async fn main() {
116
//! let (tx, mut rx) = watch::channel("hello");
117
//! tx.send("goodbye").unwrap();
118
//! drop(tx);
119
//!
120
//! // `has_changed` does not mark the value as seen and errors
121
//! // since the channel is closed.
122
//! assert!(rx.has_changed().is_err());
123
//!
124
//! // `changed` returns Ok since the value is not already marked as seen
125
//! // even if the channel is closed.
126
//! assert!(rx.changed().await.is_ok());
127
//!
128
//! // The `changed` call above marks the value as seen.
129
//! // The next `changed` call now returns an error as the channel is closed
130
//! // AND the current value is seen.
131
//! assert!(rx.changed().await.is_err());
132
//! # }
133
//! ```
134
//!
135
//! # Closing
136
//!
137
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
138
//! when all [`Receiver`] handles have been dropped. This indicates that there
139
//! is no further interest in the values being produced and work can be stopped.
140
//!
141
//! The value in the channel will not be dropped until all senders and all
142
//! receivers have been dropped.
143
//!
144
//! # Thread safety
145
//!
146
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
147
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
148
//! handles may be moved to separate threads and also used concurrently.
149
//!
150
//! [`Sender`]: crate::sync::watch::Sender
151
//! [`Receiver`]: crate::sync::watch::Receiver
152
//! [`changed`]: crate::sync::watch::Receiver::changed
153
//! [`has_changed`]: crate::sync::watch::Receiver::has_changed
154
//! [`borrow`]: crate::sync::watch::Receiver::borrow
155
//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update
156
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
157
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
158
//! [`Receiver::borrow_and_update()`]:
159
//!     crate::sync::watch::Receiver::borrow_and_update
160
//! [`channel`]: crate::sync::watch::channel
161
//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
162
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
163
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
164
165
use crate::sync::notify::Notify;
166
use crate::task::coop::cooperative;
167
168
use crate::loom::sync::atomic::AtomicUsize;
169
use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
170
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
171
use std::fmt;
172
use std::mem;
173
use std::ops;
174
use std::panic;
175
176
/// Receives values from the associated [`Sender`](struct@Sender).
177
///
178
/// Instances are created by the [`channel`](fn@channel) function.
179
///
180
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
181
/// wrapper.
182
///
183
/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
184
#[derive(Debug)]
185
pub struct Receiver<T> {
186
    /// Pointer to the shared state
187
    shared: Arc<Shared<T>>,
188
189
    /// Last observed version
190
    version: Version,
191
}
192
193
/// Sends values to the associated [`Receiver`](struct@Receiver).
194
///
195
/// Instances are created by the [`channel`](fn@channel) function.
196
#[derive(Debug)]
197
pub struct Sender<T> {
198
    shared: Arc<Shared<T>>,
199
}
200
201
impl<T> Clone for Sender<T> {
202
0
    fn clone(&self) -> Self {
203
0
        self.shared.ref_count_tx.fetch_add(1, Relaxed);
204
205
0
        Self {
206
0
            shared: self.shared.clone(),
207
0
        }
208
0
    }
209
}
210
211
impl<T: Default> Default for Sender<T> {
212
0
    fn default() -> Self {
213
0
        Self::new(T::default())
214
0
    }
215
}
216
217
/// Returns a reference to the inner value.
218
///
219
/// Outstanding borrows hold a read lock on the inner value. This means that
220
/// long-lived borrows could cause the producer half to block. It is recommended
221
/// to keep the borrow as short-lived as possible. Additionally, if you are
222
/// running in an environment that allows `!Send` futures, you must ensure that
223
/// the returned `Ref` type is never held alive across an `.await` point,
224
/// otherwise, it can lead to a deadlock.
225
///
226
/// The priority policy of the lock is dependent on the underlying lock
227
/// implementation, and this type does not guarantee that any particular policy
228
/// will be used. In particular, a producer which is waiting to acquire the lock
229
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
230
///
231
/// <details><summary>Potential deadlock example</summary>
232
///
233
/// ```text
234
/// // Task 1 (on thread A)    |  // Task 2 (on thread B)
235
/// let _ref1 = rx.borrow();   |
236
///                            |  // will block
237
///                            |  let _ = tx.send(());
238
/// // may deadlock            |
239
/// let _ref2 = rx.borrow();   |
240
/// ```
241
/// </details>
242
#[derive(Debug)]
243
pub struct Ref<'a, T> {
244
    inner: RwLockReadGuard<'a, T>,
245
    has_changed: bool,
246
}
247
248
impl<'a, T> Ref<'a, T> {
249
    /// Indicates if the borrowed value is considered as _changed_ since the last
250
    /// time it has been marked as seen.
251
    ///
252
    /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
253
    ///
254
    /// When borrowed from the [`Sender`] this function will always return `false`.
255
    ///
256
    /// # Examples
257
    ///
258
    /// ```
259
    /// use tokio::sync::watch;
260
    ///
261
    /// # #[tokio::main(flavor = "current_thread")]
262
    /// # async fn main() {
263
    /// let (tx, mut rx) = watch::channel("hello");
264
    ///
265
    /// tx.send("goodbye").unwrap();
266
    /// // The sender does never consider the value as changed.
267
    /// assert!(!tx.borrow().has_changed());
268
    ///
269
    /// // Drop the sender immediately, just for testing purposes.
270
    /// drop(tx);
271
    ///
272
    /// // Even if the sender has already been dropped...
273
    /// assert!(rx.has_changed().is_err());
274
    /// // ...the modified value is still readable and detected as changed.
275
    /// assert_eq!(*rx.borrow(), "goodbye");
276
    /// assert!(rx.borrow().has_changed());
277
    ///
278
    /// // Read the changed value and mark it as seen.
279
    /// {
280
    ///     let received = rx.borrow_and_update();
281
    ///     assert_eq!(*received, "goodbye");
282
    ///     assert!(received.has_changed());
283
    ///     // Release the read lock when leaving this scope.
284
    /// }
285
    ///
286
    /// // Now the value has already been marked as seen and could
287
    /// // never be modified again (after the sender has been dropped).
288
    /// assert!(!rx.borrow().has_changed());
289
    /// # }
290
    /// ```
291
0
    pub fn has_changed(&self) -> bool {
292
0
        self.has_changed
293
0
    }
294
}
295
296
struct Shared<T> {
297
    /// The most recent value.
298
    value: RwLock<T>,
299
300
    /// The current version.
301
    ///
302
    /// The lowest bit represents a "closed" state. The rest of the bits
303
    /// represent the current version.
304
    state: AtomicState,
305
306
    /// Tracks the number of `Receiver` instances.
307
    ref_count_rx: AtomicUsize,
308
309
    /// Tracks the number of `Sender` instances.
310
    ref_count_tx: AtomicUsize,
311
312
    /// Notifies waiting receivers that the value changed.
313
    notify_rx: big_notify::BigNotify,
314
315
    /// Notifies any task listening for `Receiver` dropped events.
316
    notify_tx: Notify,
317
}
318
319
impl<T: fmt::Debug> fmt::Debug for Shared<T> {
320
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321
0
        let state = self.state.load();
322
0
        f.debug_struct("Shared")
323
0
            .field("value", &self.value)
324
0
            .field("version", &state.version())
325
0
            .field("is_closed", &state.is_closed())
326
0
            .field("ref_count_rx", &self.ref_count_rx)
327
0
            .finish()
328
0
    }
329
}
330
331
pub mod error {
332
    //! Watch error types.
333
334
    use std::error::Error;
335
    use std::fmt;
336
337
    /// Error produced when sending a value fails.
338
    #[derive(PartialEq, Eq, Clone, Copy)]
339
    pub struct SendError<T>(pub T);
340
341
    // ===== impl SendError =====
342
343
    impl<T> fmt::Debug for SendError<T> {
344
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345
0
            f.debug_struct("SendError").finish_non_exhaustive()
346
0
        }
347
    }
348
349
    impl<T> fmt::Display for SendError<T> {
350
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
351
0
            write!(fmt, "channel closed")
352
0
        }
353
    }
354
355
    impl<T> Error for SendError<T> {}
356
357
    /// Error produced when receiving a change notification.
358
    #[derive(Debug, Clone)]
359
    pub struct RecvError(pub(super) ());
360
361
    // ===== impl RecvError =====
362
363
    impl fmt::Display for RecvError {
364
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
365
0
            write!(fmt, "channel closed")
366
0
        }
367
    }
368
369
    impl Error for RecvError {}
370
}
371
372
mod big_notify {
373
    use super::Notify;
374
    use crate::sync::notify::Notified;
375
376
    // To avoid contention on the lock inside the `Notify`, we store multiple
377
    // copies of it. Then, we use either circular access or randomness to spread
378
    // out threads over different `Notify` objects.
379
    //
380
    // Some simple benchmarks show that randomness performs slightly better than
381
    // circular access (probably due to contention on `next`), so we prefer to
382
    // use randomness when Tokio is compiled with a random number generator.
383
    //
384
    // When the random number generator is not available, we fall back to
385
    // circular access.
386
387
    pub(super) struct BigNotify {
388
        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
389
        next: std::sync::atomic::AtomicUsize,
390
        inner: [Notify; 8],
391
    }
392
393
    impl BigNotify {
394
130
        pub(super) fn new() -> Self {
395
130
            Self {
396
130
                #[cfg(not(all(
397
130
                    not(loom),
398
130
                    feature = "sync",
399
130
                    any(feature = "rt", feature = "macros")
400
130
                )))]
401
130
                next: std::sync::atomic::AtomicUsize::new(0),
402
130
                inner: Default::default(),
403
130
            }
404
130
        }
405
406
0
        pub(super) fn notify_waiters(&self) {
407
0
            for notify in &self.inner {
408
0
                notify.notify_waiters();
409
0
            }
410
0
        }
411
412
        /// This function implements the case where randomness is not available.
413
        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
414
        pub(super) fn notified(&self) -> Notified<'_> {
415
            let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
416
            self.inner[i].notified()
417
        }
418
419
        /// This function implements the case where randomness is available.
420
        #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
421
0
        pub(super) fn notified(&self) -> Notified<'_> {
422
0
            let i = crate::runtime::context::thread_rng_n(8) as usize;
423
0
            self.inner[i].notified()
424
0
        }
425
    }
426
}
427
428
use self::state::{AtomicState, Version};
429
mod state {
430
    use crate::loom::sync::atomic::AtomicUsize;
431
    use crate::loom::sync::atomic::Ordering;
432
433
    const CLOSED_BIT: usize = 1;
434
435
    // Using 2 as the step size preserves the `CLOSED_BIT`.
436
    const STEP_SIZE: usize = 2;
437
438
    /// The version part of the state. The lowest bit is always zero.
439
    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
440
    pub(super) struct Version(usize);
441
442
    /// Snapshot of the state. The first bit is used as the CLOSED bit.
443
    /// The remaining bits are used as the version.
444
    ///
445
    /// The CLOSED bit tracks whether all senders have been dropped. Dropping all
446
    /// receivers does not set it.
447
    #[derive(Copy, Clone, Debug)]
448
    pub(super) struct StateSnapshot(usize);
449
450
    /// The state stored in an atomic integer.
451
    ///
452
    /// The `Sender` uses `Release` ordering for storing a new state
453
    /// and the `Receiver`s use `Acquire` ordering for loading the
454
    /// current state. This ensures that written values are seen by
455
    /// the `Receiver`s for a proper handover.
456
    #[derive(Debug)]
457
    pub(super) struct AtomicState(AtomicUsize);
458
459
    impl Version {
460
        /// Decrements the version.
461
0
        pub(super) fn decrement(&mut self) {
462
            // Using a wrapping decrement here is required to ensure that the
463
            // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
464
            // which wraps on overflow.
465
0
            self.0 = self.0.wrapping_sub(STEP_SIZE);
466
0
        }
467
468
        pub(super) const INITIAL: Self = Version(0);
469
    }
470
471
    impl StateSnapshot {
472
        /// Extract the version from the state.
473
0
        pub(super) fn version(self) -> Version {
474
0
            Version(self.0 & !CLOSED_BIT)
475
0
        }
476
477
        /// Is the closed bit set?
478
0
        pub(super) fn is_closed(self) -> bool {
479
0
            (self.0 & CLOSED_BIT) == CLOSED_BIT
480
0
        }
481
    }
482
483
    impl AtomicState {
484
        /// Create a new `AtomicState` that is not closed and which has the
485
        /// version set to `Version::INITIAL`.
486
130
        pub(super) fn new() -> Self {
487
130
            AtomicState(AtomicUsize::new(Version::INITIAL.0))
488
130
        }
489
490
        /// Load the current value of the state.
491
        ///
492
        /// Only used by the receiver and for debugging purposes.
493
        ///
494
        /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
495
        /// of the shared value with the sender side (single writer). The state is always
496
        /// updated after modifying and before releasing the (exclusive) lock on the
497
        /// shared value.
498
0
        pub(super) fn load(&self) -> StateSnapshot {
499
0
            StateSnapshot(self.0.load(Ordering::Acquire))
500
0
        }
501
502
        /// Increment the version counter.
503
0
        pub(super) fn increment_version_while_locked(&self) {
504
            // Use `Release` ordering to ensure that the shared value
505
            // has been written before updating the version. The shared
506
            // value is still protected by an exclusive lock during this
507
            // method.
508
0
            self.0.fetch_add(STEP_SIZE, Ordering::Release);
509
0
        }
510
511
        /// Set the closed bit in the state.
512
0
        pub(super) fn set_closed(&self) {
513
0
            self.0.fetch_or(CLOSED_BIT, Ordering::Release);
514
0
        }
515
    }
516
}
517
518
/// Creates a new watch channel, returning the "send" and "receive" handles.
519
///
520
/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
521
/// Only the last value sent is made available to the [`Receiver`] half. All
522
/// intermediate values are dropped.
523
///
524
/// # Examples
525
///
526
/// The following example prints `hello! world! `.
527
///
528
/// ```
529
/// use tokio::sync::watch;
530
/// use tokio::time::{Duration, sleep};
531
///
532
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
533
/// let (tx, mut rx) = watch::channel("hello");
534
///
535
/// tokio::spawn(async move {
536
///     // Use the equivalent of a "do-while" loop so the initial value is
537
///     // processed before awaiting the `changed()` future.
538
///     loop {
539
///         println!("{}! ", *rx.borrow_and_update());
540
///         if rx.changed().await.is_err() {
541
///             break;
542
///         }
543
///     }
544
/// });
545
///
546
/// sleep(Duration::from_millis(100)).await;
547
/// tx.send("world")?;
548
/// # Ok(())
549
/// # }
550
/// ```
551
///
552
/// [`Sender`]: struct@Sender
553
/// [`Receiver`]: struct@Receiver
554
130
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
555
130
    let shared = Arc::new(Shared {
556
130
        value: RwLock::new(init),
557
130
        state: AtomicState::new(),
558
130
        ref_count_rx: AtomicUsize::new(1),
559
130
        ref_count_tx: AtomicUsize::new(1),
560
130
        notify_rx: big_notify::BigNotify::new(),
561
130
        notify_tx: Notify::new(),
562
130
    });
563
564
130
    let tx = Sender {
565
130
        shared: shared.clone(),
566
130
    };
567
568
130
    let rx = Receiver {
569
130
        shared,
570
130
        version: Version::INITIAL,
571
130
    };
572
573
130
    (tx, rx)
574
130
}
Unexecuted instantiation: tokio::sync::watch::channel::<usize>
tokio::sync::watch::channel::<()>
Line
Count
Source
554
130
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
555
130
    let shared = Arc::new(Shared {
556
130
        value: RwLock::new(init),
557
130
        state: AtomicState::new(),
558
130
        ref_count_rx: AtomicUsize::new(1),
559
130
        ref_count_tx: AtomicUsize::new(1),
560
130
        notify_rx: big_notify::BigNotify::new(),
561
130
        notify_tx: Notify::new(),
562
130
    });
563
564
130
    let tx = Sender {
565
130
        shared: shared.clone(),
566
130
    };
567
568
130
    let rx = Receiver {
569
130
        shared,
570
130
        version: Version::INITIAL,
571
130
    };
572
573
130
    (tx, rx)
574
130
}
575
576
impl<T> Receiver<T> {
577
0
    fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
578
        // No synchronization necessary as this is only used as a counter and
579
        // not memory access.
580
0
        shared.ref_count_rx.fetch_add(1, Relaxed);
581
582
0
        Self { shared, version }
583
0
    }
584
585
    /// Returns a reference to the most recently sent value.
586
    ///
587
    /// This method does not mark the returned value as seen, so future calls to
588
    /// [`changed`] may return immediately even if you have already seen the
589
    /// value with a call to `borrow`.
590
    ///
591
    /// Outstanding borrows hold a read lock on the inner value. This means that
592
    /// long-lived borrows could cause the producer half to block. It is recommended
593
    /// to keep the borrow as short-lived as possible. Additionally, if you are
594
    /// running in an environment that allows `!Send` futures, you must ensure that
595
    /// the returned `Ref` type is never held alive across an `.await` point,
596
    /// otherwise, it can lead to a deadlock.
597
    ///
598
    /// The priority policy of the lock is dependent on the underlying lock
599
    /// implementation, and this type does not guarantee that any particular policy
600
    /// will be used. In particular, a producer which is waiting to acquire the lock
601
    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
602
    ///
603
    /// <details><summary>Potential deadlock example</summary>
604
    ///
605
    /// ```text
606
    /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
607
    /// let _ref1 = rx.borrow();   |
608
    ///                            |  // will block
609
    ///                            |  let _ = tx.send(());
610
    /// // may deadlock            |
611
    /// let _ref2 = rx.borrow();   |
612
    /// ```
613
    /// </details>
614
    ///
615
    /// For more information on when to use this method versus
616
    /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
617
    ///
618
    /// [`changed`]: Receiver::changed
619
    /// [`borrow_and_update`]: Receiver::borrow_and_update
620
    ///
621
    /// # Examples
622
    ///
623
    /// ```
624
    /// use tokio::sync::watch;
625
    ///
626
    /// let (_, rx) = watch::channel("hello");
627
    /// assert_eq!(*rx.borrow(), "hello");
628
    /// ```
629
0
    pub fn borrow(&self) -> Ref<'_, T> {
630
0
        let inner = self.shared.value.read();
631
632
        // After obtaining a read-lock no concurrent writes could occur
633
        // and the loaded version matches that of the borrowed reference.
634
0
        let new_version = self.shared.state.load().version();
635
0
        let has_changed = self.version != new_version;
636
637
0
        Ref { inner, has_changed }
638
0
    }
639
640
    /// Returns a reference to the most recently sent value and marks that value
641
    /// as seen.
642
    ///
643
    /// This method marks the current value as seen. Subsequent calls to [`changed`]
644
    /// will not return immediately until the [`Sender`] has modified the shared
645
    /// value again.
646
    ///
647
    /// Outstanding borrows hold a read lock on the inner value. This means that
648
    /// long-lived borrows could cause the producer half to block. It is recommended
649
    /// to keep the borrow as short-lived as possible. Additionally, if you are
650
    /// running in an environment that allows `!Send` futures, you must ensure that
651
    /// the returned `Ref` type is never held alive across an `.await` point,
652
    /// otherwise, it can lead to a deadlock.
653
    ///
654
    /// The priority policy of the lock is dependent on the underlying lock
655
    /// implementation, and this type does not guarantee that any particular policy
656
    /// will be used. In particular, a producer which is waiting to acquire the lock
657
    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
658
    ///
659
    /// <details><summary>Potential deadlock example</summary>
660
    ///
661
    /// ```text
662
    /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
663
    /// let _ref1 = rx1.borrow_and_update();   |
664
    ///                                        |  // will block
665
    ///                                        |  let _ = tx.send(());
666
    /// // may deadlock                        |
667
    /// let _ref2 = rx2.borrow_and_update();   |
668
    /// ```
669
    /// </details>
670
    ///
671
    /// For more information on when to use this method versus [`borrow`], see
672
    /// [here](self#borrow_and_update-versus-borrow).
673
    ///
674
    /// [`changed`]: Receiver::changed
675
    /// [`borrow`]: Receiver::borrow
676
0
    pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
677
0
        let inner = self.shared.value.read();
678
679
        // After obtaining a read-lock no concurrent writes could occur
680
        // and the loaded version matches that of the borrowed reference.
681
0
        let new_version = self.shared.state.load().version();
682
0
        let has_changed = self.version != new_version;
683
684
        // Mark the shared value as seen by updating the version
685
0
        self.version = new_version;
686
687
0
        Ref { inner, has_changed }
688
0
    }
689
690
    /// Checks if this channel contains a message that this receiver has not yet
691
    /// seen. The current value will not be marked as seen.
692
    ///
693
    /// Although this method is called `has_changed`, it does not check
694
    /// messages for equality, so this call will return true even if the current
695
    /// message is equal to the previous message.
696
    ///
697
    /// # Errors
698
    ///
699
    /// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed.
700
    ///
701
    /// # Examples
702
    ///
703
    /// ## Basic usage
704
    ///
705
    /// ```
706
    /// use tokio::sync::watch;
707
    ///
708
    /// # #[tokio::main(flavor = "current_thread")]
709
    /// # async fn main() {
710
    /// let (tx, mut rx) = watch::channel("hello");
711
    ///
712
    /// tx.send("goodbye").unwrap();
713
    ///
714
    /// assert!(rx.has_changed().unwrap());
715
    /// assert_eq!(*rx.borrow_and_update(), "goodbye");
716
    ///
717
    /// // The value has been marked as seen
718
    /// assert!(!rx.has_changed().unwrap());
719
    /// # }
720
    /// ```
721
    ///
722
    /// ## Closed channel example
723
    ///
724
    /// ```
725
    /// use tokio::sync::watch;
726
    ///
727
    /// # #[tokio::main(flavor = "current_thread")]
728
    /// # async fn main() {
729
    /// let (tx, rx) = watch::channel("hello");
730
    /// tx.send("goodbye").unwrap();
731
    ///
732
    /// drop(tx);
733
    ///
734
    /// // The channel is closed
735
    /// assert!(rx.has_changed().is_err());
736
    /// # }
737
    /// ```
738
0
    pub fn has_changed(&self) -> Result<bool, error::RecvError> {
739
        // Load the version from the state
740
0
        let state = self.shared.state.load();
741
0
        if state.is_closed() {
742
            // All senders have dropped.
743
0
            return Err(error::RecvError(()));
744
0
        }
745
0
        let new_version = state.version();
746
747
0
        Ok(self.version != new_version)
748
0
    }
749
750
    /// Marks the state as changed.
751
    ///
752
    /// After invoking this method [`has_changed()`](Self::has_changed)
753
    /// returns `true` and [`changed()`](Self::changed) returns
754
    /// immediately, regardless of whether a new value has been sent.
755
    ///
756
    /// This is useful for triggering an initial change notification after
757
    /// subscribing to synchronize new receivers.
758
0
    pub fn mark_changed(&mut self) {
759
0
        self.version.decrement();
760
0
    }
761
762
    /// Marks the state as unchanged.
763
    ///
764
    /// The current value will be considered seen by the receiver.
765
    ///
766
    /// This is useful if you are not interested in the current value
767
    /// visible in the receiver.
768
0
    pub fn mark_unchanged(&mut self) {
769
0
        let current_version = self.shared.state.load().version();
770
0
        self.version = current_version;
771
0
    }
772
773
    /// Waits for a change notification, then marks the current value as seen.
774
    ///
775
    /// If the current value in the channel has not yet been marked seen when
776
    /// this method is called, the method marks that value seen and returns
777
    /// immediately. If the newest value has already been marked seen, then the
778
    /// method sleeps until a new message is sent by a [`Sender`] connected to
779
    /// this `Receiver`, or until all [`Sender`]s are dropped.
780
    ///
781
    /// For more information, see
782
    /// [*Change notifications*](self#change-notifications) in the module-level documentation.
783
    ///
784
    /// # Errors
785
    ///
786
    /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
787
    /// the current value is seen.
788
    ///
789
    /// # Cancel safety
790
    ///
791
    /// This method is cancel safe. If you use it as the event in a
792
    /// [`tokio::select!`](crate::select) statement and some other branch
793
    /// completes first, then it is guaranteed that no values have been marked
794
    /// seen by this call to `changed`.
795
    ///
796
    /// [`Sender`]: struct@Sender
797
    ///
798
    /// # Examples
799
    ///
800
    /// ```
801
    /// use tokio::sync::watch;
802
    ///
803
    /// # #[tokio::main(flavor = "current_thread")]
804
    /// # async fn main() {
805
    /// let (tx, mut rx) = watch::channel("hello");
806
    ///
807
    /// tokio::spawn(async move {
808
    ///     tx.send("goodbye").unwrap();
809
    /// });
810
    ///
811
    /// assert!(rx.changed().await.is_ok());
812
    /// assert_eq!(*rx.borrow_and_update(), "goodbye");
813
    ///
814
    /// // The `tx` handle has been dropped
815
    /// assert!(rx.changed().await.is_err());
816
    /// # }
817
    /// ```
818
0
    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
819
0
        cooperative(changed_impl(&self.shared, &mut self.version)).await
820
0
    }
821
822
    /// Waits for a value that satisfies the provided condition.
823
    ///
824
    /// This method will call the provided closure whenever something is sent on
825
    /// the channel. Once the closure returns `true`, this method will return a
826
    /// reference to the value that was passed to the closure.
827
    ///
828
    /// Before `wait_for` starts waiting for changes, it will call the closure
829
    /// on the current value. If the closure returns `true` when given the
830
    /// current value, then `wait_for` will immediately return a reference to
831
    /// the current value. This is the case even if the current value is already
832
    /// considered seen.
833
    ///
834
    /// The watch channel only keeps track of the most recent value, so if
835
    /// several messages are sent faster than `wait_for` is able to call the
836
    /// closure, then it may skip some updates. Whenever the closure is called,
837
    /// it will be called with the most recent value.
838
    ///
839
    /// When this function returns, the value that was passed to the closure
840
    /// when it returned `true` will be considered seen.
841
    ///
842
    /// If the channel is closed, then `wait_for` will return a [`RecvError`].
843
    /// Once this happens, no more messages can ever be sent on the channel.
844
    /// When an error is returned, it is guaranteed that the closure has been
845
    /// called on the last value, and that it returned `false` for that value.
846
    /// (If the closure returned `true`, then the last value would have been
847
    /// returned instead of the error.)
848
    ///
849
    /// Like the [`borrow`] method, the returned borrow holds a read lock on the
850
    /// inner value. This means that long-lived borrows could cause the producer
851
    /// half to block. It is recommended to keep the borrow as short-lived as
852
    /// possible. See the documentation of `borrow` for more information on
853
    /// this.
854
    ///
855
    /// [`borrow`]: Receiver::borrow
856
    /// [`RecvError`]: error::RecvError
857
    ///
858
    /// # Cancel safety
859
    ///
860
    /// This method is cancel safe. If you use it as the event in a
861
    /// [`tokio::select!`](crate::select) statement and some other branch
862
    /// completes first, then it is guaranteed that the last seen value `val`
863
    /// (if any) satisfies `f(val) == false`.
864
    ///
865
    /// # Panics
866
    ///
867
    /// If and only if the closure `f` panics. In that case, no resource owned
868
    /// or shared by this [`Receiver`] will be poisoned.
869
    ///
870
    /// # Examples
871
    ///
872
    /// ```
873
    /// use tokio::sync::watch;
874
    /// use tokio::time::{sleep, Duration};
875
    ///
876
    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
877
    /// async fn main() {
878
    ///     let (tx, mut rx) = watch::channel("hello");
879
    ///
880
    ///     tokio::spawn(async move {
881
    ///         sleep(Duration::from_secs(1)).await;
882
    ///         tx.send("goodbye").unwrap();
883
    ///     });
884
    ///
885
    ///     assert!(rx.wait_for(|val| *val == "goodbye").await.is_ok());
886
    ///     assert_eq!(*rx.borrow(), "goodbye");
887
    /// }
888
    /// ```
889
0
    pub async fn wait_for(
890
0
        &mut self,
891
0
        f: impl FnMut(&T) -> bool,
892
0
    ) -> Result<Ref<'_, T>, error::RecvError> {
893
0
        cooperative(self.wait_for_inner(f)).await
894
0
    }
895
896
0
    async fn wait_for_inner(
897
0
        &mut self,
898
0
        mut f: impl FnMut(&T) -> bool,
899
0
    ) -> Result<Ref<'_, T>, error::RecvError> {
900
0
        let mut closed = false;
901
        loop {
902
            {
903
0
                let inner = self.shared.value.read();
904
905
0
                let new_version = self.shared.state.load().version();
906
0
                let has_changed = self.version != new_version;
907
0
                self.version = new_version;
908
909
0
                if !closed || has_changed {
910
0
                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
911
0
                    match result {
912
                        Ok(true) => {
913
0
                            return Ok(Ref { inner, has_changed });
914
                        }
915
0
                        Ok(false) => {
916
0
                            // Skip the value.
917
0
                        }
918
0
                        Err(panicked) => {
919
                            // Drop the read-lock to avoid poisoning it.
920
0
                            drop(inner);
921
                            // Forward the panic to the caller.
922
0
                            panic::resume_unwind(panicked);
923
                            // Unreachable
924
                        }
925
                    };
926
0
                }
927
            }
928
929
0
            if closed {
930
0
                return Err(error::RecvError(()));
931
0
            }
932
933
            // Wait for the value to change.
934
0
            closed = changed_impl(&self.shared, &mut self.version).await.is_err();
935
        }
936
0
    }
937
938
    /// Returns `true` if receivers belong to the same channel.
939
    ///
940
    /// # Examples
941
    ///
942
    /// ```
943
    /// let (tx, rx) = tokio::sync::watch::channel(true);
944
    /// let rx2 = rx.clone();
945
    /// assert!(rx.same_channel(&rx2));
946
    ///
947
    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
948
    /// assert!(!rx3.same_channel(&rx2));
949
    /// ```
950
0
    pub fn same_channel(&self, other: &Self) -> bool {
951
0
        Arc::ptr_eq(&self.shared, &other.shared)
952
0
    }
953
954
    cfg_process_driver! {
955
0
        pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
956
0
            maybe_changed(&self.shared, &mut self.version)
957
0
        }
958
    }
959
}
960
961
0
fn maybe_changed<T>(
962
0
    shared: &Shared<T>,
963
0
    version: &mut Version,
964
0
) -> Option<Result<(), error::RecvError>> {
965
    // Load the version from the state
966
0
    let state = shared.state.load();
967
0
    let new_version = state.version();
968
969
0
    if *version != new_version {
970
        // Observe the new version and return
971
0
        *version = new_version;
972
0
        return Some(Ok(()));
973
0
    }
974
975
0
    if state.is_closed() {
976
        // All senders have been dropped.
977
0
        return Some(Err(error::RecvError(())));
978
0
    }
979
980
0
    None
981
0
}
982
983
0
async fn changed_impl<T>(
984
0
    shared: &Shared<T>,
985
0
    version: &mut Version,
986
0
) -> Result<(), error::RecvError> {
987
0
    crate::trace::async_trace_leaf().await;
988
989
    loop {
990
        // In order to avoid a race condition, we first request a notification,
991
        // **then** check the current value's version. If a new version exists,
992
        // the notification request is dropped.
993
0
        let notified = shared.notify_rx.notified();
994
995
0
        if let Some(ret) = maybe_changed(shared, version) {
996
0
            return ret;
997
0
        }
998
999
0
        notified.await;
1000
        // loop around again in case the wake-up was spurious
1001
    }
1002
0
}
1003
1004
impl<T> Clone for Receiver<T> {
1005
0
    fn clone(&self) -> Self {
1006
0
        let version = self.version;
1007
0
        let shared = self.shared.clone();
1008
1009
0
        Self::from_shared(version, shared)
1010
0
    }
1011
}
1012
1013
impl<T> Drop for Receiver<T> {
1014
130
    fn drop(&mut self) {
1015
        // No synchronization necessary as this is only used as a counter and
1016
        // not memory access.
1017
130
        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
1018
130
            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
1019
130
            self.shared.notify_tx.notify_waiters();
1020
130
        }
1021
130
    }
Unexecuted instantiation: <tokio::sync::watch::Receiver<usize> as core::ops::drop::Drop>::drop
<tokio::sync::watch::Receiver<()> as core::ops::drop::Drop>::drop
Line
Count
Source
1014
130
    fn drop(&mut self) {
1015
        // No synchronization necessary as this is only used as a counter and
1016
        // not memory access.
1017
130
        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
1018
130
            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
1019
130
            self.shared.notify_tx.notify_waiters();
1020
130
        }
1021
130
    }
1022
}
1023
1024
impl<T> Sender<T> {
1025
    /// Creates the sending-half of the [`watch`] channel.
1026
    ///
1027
    /// See documentation of [`watch::channel`] for errors when calling this function.
1028
    /// Beware that attempting to send a value when there are no receivers will
1029
    /// return an error.
1030
    ///
1031
    /// [`watch`]: crate::sync::watch
1032
    /// [`watch::channel`]: crate::sync::watch
1033
    ///
1034
    /// # Examples
1035
    /// ```
1036
    /// let sender = tokio::sync::watch::Sender::new(0u8);
1037
    /// assert!(sender.send(3).is_err());
1038
    /// let _rec = sender.subscribe();
1039
    /// assert!(sender.send(4).is_ok());
1040
    /// ```
1041
0
    pub fn new(init: T) -> Self {
1042
0
        let (tx, _) = channel(init);
1043
0
        tx
1044
0
    }
1045
1046
    /// Sends a new value via the channel, notifying all receivers.
1047
    ///
1048
    /// This method fails if the channel is closed, which is the case when
1049
    /// every receiver has been dropped. It is possible to reopen the channel
1050
    /// using the [`subscribe`] method. However, when `send` fails, the value
1051
    /// isn't made available for future receivers (but returned with the
1052
    /// [`SendError`]).
1053
    ///
1054
    /// To always make a new value available for future receivers, even if no
1055
    /// receiver currently exists, one of the other send methods
1056
    /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
1057
    /// used instead.
1058
    ///
1059
    /// [`subscribe`]: Sender::subscribe
1060
    /// [`SendError`]: error::SendError
1061
    /// [`send_if_modified`]: Sender::send_if_modified
1062
    /// [`send_modify`]: Sender::send_modify
1063
    /// [`send_replace`]: Sender::send_replace
1064
0
    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
1065
        // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
1066
0
        if 0 == self.receiver_count() {
1067
0
            return Err(error::SendError(value));
1068
0
        }
1069
1070
0
        self.send_replace(value);
1071
0
        Ok(())
1072
0
    }
1073
1074
    /// Modifies the watched value **unconditionally** in-place,
1075
    /// notifying all receivers.
1076
    ///
1077
    /// This can be useful for modifying the watched value, without
1078
    /// having to allocate a new instance. Additionally, this
1079
    /// method permits sending values even when there are no receivers.
1080
    ///
1081
    /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1082
    /// if the value is only modified conditionally during the mutable borrow
1083
    /// to prevent unneeded change notifications for unmodified values.
1084
    ///
1085
    /// # Panics
1086
    ///
1087
    /// This function panics when the invocation of the `modify` closure panics.
1088
    /// No receivers are notified when panicking. All changes of the watched
1089
    /// value applied by the closure before panicking will be visible in
1090
    /// subsequent calls to `borrow`.
1091
    ///
1092
    /// # Examples
1093
    ///
1094
    /// ```
1095
    /// use tokio::sync::watch;
1096
    ///
1097
    /// struct State {
1098
    ///     counter: usize,
1099
    /// }
1100
    /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1101
    /// state_tx.send_modify(|state| state.counter += 1);
1102
    /// assert_eq!(state_rx.borrow().counter, 1);
1103
    /// ```
1104
0
    pub fn send_modify<F>(&self, modify: F)
1105
0
    where
1106
0
        F: FnOnce(&mut T),
1107
    {
1108
0
        self.send_if_modified(|value| {
1109
0
            modify(value);
1110
0
            true
1111
0
        });
1112
0
    }
1113
1114
    /// Modifies the watched value **conditionally** in-place,
1115
    /// notifying all receivers only if modified.
1116
    ///
1117
    /// This can be useful for modifying the watched value, without
1118
    /// having to allocate a new instance. Additionally, this
1119
    /// method permits sending values even when there are no receivers.
1120
    ///
1121
    /// The `modify` closure must return `true` if the value has actually
1122
    /// been modified during the mutable borrow. It should only return `false`
1123
    /// if the value is guaranteed to be unmodified despite the mutable
1124
    /// borrow.
1125
    ///
1126
    /// Receivers are only notified if the closure returned `true`. If the
1127
    /// closure has modified the value but returned `false` this results
1128
    /// in a *silent modification*, i.e. the modified value will be visible
1129
    /// in subsequent calls to `borrow`, but receivers will not receive
1130
    /// a change notification.
1131
    ///
1132
    /// Returns the result of the closure, i.e. `true` if the value has
1133
    /// been modified and `false` otherwise.
1134
    ///
1135
    /// # Panics
1136
    ///
1137
    /// This function panics when the invocation of the `modify` closure panics.
1138
    /// No receivers are notified when panicking. All changes of the watched
1139
    /// value applied by the closure before panicking will be visible in
1140
    /// subsequent calls to `borrow`.
1141
    ///
1142
    /// # Examples
1143
    ///
1144
    /// ```
1145
    /// use tokio::sync::watch;
1146
    ///
1147
    /// struct State {
1148
    ///     counter: usize,
1149
    /// }
1150
    /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1151
    /// let inc_counter_if_odd = |state: &mut State| {
1152
    ///     if state.counter % 2 == 1 {
1153
    ///         state.counter += 1;
1154
    ///         return true;
1155
    ///     }
1156
    ///     false
1157
    /// };
1158
    ///
1159
    /// assert_eq!(state_rx.borrow().counter, 1);
1160
    ///
1161
    /// assert!(!state_rx.has_changed().unwrap());
1162
    /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1163
    /// assert!(state_rx.has_changed().unwrap());
1164
    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1165
    ///
1166
    /// assert!(!state_rx.has_changed().unwrap());
1167
    /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1168
    /// assert!(!state_rx.has_changed().unwrap());
1169
    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1170
    /// ```
1171
0
    pub fn send_if_modified<F>(&self, modify: F) -> bool
1172
0
    where
1173
0
        F: FnOnce(&mut T) -> bool,
1174
    {
1175
        {
1176
            // Acquire the write lock and update the value.
1177
0
            let mut lock = self.shared.value.write();
1178
1179
            // Update the value and catch possible panic inside func.
1180
0
            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1181
0
            match result {
1182
0
                Ok(modified) => {
1183
0
                    if !modified {
1184
                        // Abort, i.e. don't notify receivers if unmodified
1185
0
                        return false;
1186
0
                    }
1187
                    // Continue if modified
1188
                }
1189
0
                Err(panicked) => {
1190
                    // Drop the lock to avoid poisoning it.
1191
0
                    drop(lock);
1192
                    // Forward the panic to the caller.
1193
0
                    panic::resume_unwind(panicked);
1194
                    // Unreachable
1195
                }
1196
            };
1197
1198
0
            self.shared.state.increment_version_while_locked();
1199
1200
            // Release the write lock.
1201
            //
1202
            // Incrementing the version counter while holding the lock ensures
1203
            // that receivers are able to figure out the version number of the
1204
            // value they are currently looking at.
1205
0
            drop(lock);
1206
        }
1207
1208
0
        self.shared.notify_rx.notify_waiters();
1209
1210
0
        true
1211
0
    }
1212
1213
    /// Sends a new value via the channel, notifying all receivers and returning
1214
    /// the previous value in the channel.
1215
    ///
1216
    /// This can be useful for reusing the buffers inside a watched value.
1217
    /// Additionally, this method permits sending values even when there are no
1218
    /// receivers.
1219
    ///
1220
    /// # Examples
1221
    ///
1222
    /// ```
1223
    /// use tokio::sync::watch;
1224
    ///
1225
    /// let (tx, _rx) = watch::channel(1);
1226
    /// assert_eq!(tx.send_replace(2), 1);
1227
    /// assert_eq!(tx.send_replace(3), 2);
1228
    /// ```
1229
0
    pub fn send_replace(&self, mut value: T) -> T {
1230
        // swap old watched value with the new one
1231
0
        self.send_modify(|old| mem::swap(old, &mut value));
1232
1233
0
        value
1234
0
    }
1235
1236
    /// Returns a reference to the most recently sent value
1237
    ///
1238
    /// Outstanding borrows hold a read lock on the inner value. This means that
1239
    /// long-lived borrows could cause the producer half to block. It is recommended
1240
    /// to keep the borrow as short-lived as possible. Additionally, if you are
1241
    /// running in an environment that allows `!Send` futures, you must ensure that
1242
    /// the returned `Ref` type is never held alive across an `.await` point,
1243
    /// otherwise, it can lead to a deadlock.
1244
    ///
1245
    /// # Examples
1246
    ///
1247
    /// ```
1248
    /// use tokio::sync::watch;
1249
    ///
1250
    /// let (tx, _) = watch::channel("hello");
1251
    /// assert_eq!(*tx.borrow(), "hello");
1252
    /// ```
1253
0
    pub fn borrow(&self) -> Ref<'_, T> {
1254
0
        let inner = self.shared.value.read();
1255
1256
        // The sender/producer always sees the current version
1257
0
        let has_changed = false;
1258
1259
0
        Ref { inner, has_changed }
1260
0
    }
1261
1262
    /// Checks if the channel has been closed. This happens when all receivers
1263
    /// have dropped.
1264
    ///
1265
    /// # Examples
1266
    ///
1267
    /// ```
1268
    /// let (tx, rx) = tokio::sync::watch::channel(());
1269
    /// assert!(!tx.is_closed());
1270
    ///
1271
    /// drop(rx);
1272
    /// assert!(tx.is_closed());
1273
    /// ```
1274
0
    pub fn is_closed(&self) -> bool {
1275
0
        self.receiver_count() == 0
1276
0
    }
1277
1278
    /// Completes when all receivers have dropped.
1279
    ///
1280
    /// This allows the producer to get notified when interest in the produced
1281
    /// values is canceled and immediately stop doing work. Once a channel is
1282
    /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1283
    /// get a new receiver.
1284
    ///
1285
    /// If the channel becomes closed for a brief amount of time (e.g., the last
1286
    /// receiver is dropped and then `subscribe` is called), then this call to
1287
    /// `closed` might return, but it is also possible that it does not "notice"
1288
    /// that the channel was closed for a brief amount of time.
1289
    ///
1290
    /// # Cancel safety
1291
    ///
1292
    /// This method is cancel safe.
1293
    ///
1294
    /// # Examples
1295
    ///
1296
    /// ```
1297
    /// use tokio::sync::watch;
1298
    ///
1299
    /// # #[tokio::main(flavor = "current_thread")]
1300
    /// # async fn main() {
1301
    /// let (tx, rx) = watch::channel("hello");
1302
    ///
1303
    /// tokio::spawn(async move {
1304
    ///     // use `rx`
1305
    ///     drop(rx);
1306
    /// });
1307
    ///
1308
    /// // Waits for `rx` to drop
1309
    /// tx.closed().await;
1310
    /// println!("the `rx` handles dropped")
1311
    /// # }
1312
    /// ```
1313
0
    pub async fn closed(&self) {
1314
0
        cooperative(async {
1315
0
            crate::trace::async_trace_leaf().await;
1316
1317
0
            while self.receiver_count() > 0 {
1318
0
                let notified = self.shared.notify_tx.notified();
1319
1320
0
                if self.receiver_count() == 0 {
1321
0
                    return;
1322
0
                }
1323
1324
0
                notified.await;
1325
                // The channel could have been reopened in the meantime by calling
1326
                // `subscribe`, so we loop again.
1327
            }
1328
0
        })
1329
0
        .await;
1330
0
    }
1331
1332
    /// Creates a new [`Receiver`] connected to this `Sender`.
1333
    ///
1334
    /// All messages sent before this call to `subscribe` are initially marked
1335
    /// as seen by the new `Receiver`.
1336
    ///
1337
    /// This method can be called even if there are no other receivers. In this
1338
    /// case, the channel is reopened.
1339
    ///
1340
    /// # Examples
1341
    ///
1342
    /// The new channel will receive messages sent on this `Sender`.
1343
    ///
1344
    /// ```
1345
    /// use tokio::sync::watch;
1346
    ///
1347
    /// # #[tokio::main(flavor = "current_thread")]
1348
    /// # async fn main() {
1349
    /// let (tx, _rx) = watch::channel(0u64);
1350
    ///
1351
    /// tx.send(5).unwrap();
1352
    ///
1353
    /// let rx = tx.subscribe();
1354
    /// assert_eq!(5, *rx.borrow());
1355
    ///
1356
    /// tx.send(10).unwrap();
1357
    /// assert_eq!(10, *rx.borrow());
1358
    /// # }
1359
    /// ```
1360
    ///
1361
    /// The most recent message is considered seen by the channel, so this test
1362
    /// is guaranteed to pass.
1363
    ///
1364
    /// ```
1365
    /// use tokio::sync::watch;
1366
    /// use tokio::time::Duration;
1367
    ///
1368
    /// # #[tokio::main(flavor = "current_thread")]
1369
    /// # async fn main() {
1370
    /// let (tx, _rx) = watch::channel(0u64);
1371
    /// tx.send(5).unwrap();
1372
    /// let mut rx = tx.subscribe();
1373
    ///
1374
    /// tokio::spawn(async move {
1375
    ///     // by spawning and sleeping, the message is sent after `main`
1376
    ///     // hits the call to `changed`.
1377
    ///     # if false {
1378
    ///     tokio::time::sleep(Duration::from_millis(10)).await;
1379
    ///     # }
1380
    ///     tx.send(100).unwrap();
1381
    /// });
1382
    ///
1383
    /// rx.changed().await.unwrap();
1384
    /// assert_eq!(100, *rx.borrow());
1385
    /// # }
1386
    /// ```
1387
0
    pub fn subscribe(&self) -> Receiver<T> {
1388
0
        let shared = self.shared.clone();
1389
0
        let version = shared.state.load().version();
1390
1391
        // The CLOSED bit in the state tracks only whether the sender is
1392
        // dropped, so we do not need to unset it if this reopens the channel.
1393
0
        Receiver::from_shared(version, shared)
1394
0
    }
1395
1396
    /// Returns the number of receivers that currently exist.
1397
    ///
1398
    /// # Examples
1399
    ///
1400
    /// ```
1401
    /// use tokio::sync::watch;
1402
    ///
1403
    /// # #[tokio::main(flavor = "current_thread")]
1404
    /// # async fn main() {
1405
    /// let (tx, rx1) = watch::channel("hello");
1406
    ///
1407
    /// assert_eq!(1, tx.receiver_count());
1408
    ///
1409
    /// let mut _rx2 = rx1.clone();
1410
    ///
1411
    /// assert_eq!(2, tx.receiver_count());
1412
    /// # }
1413
    /// ```
1414
0
    pub fn receiver_count(&self) -> usize {
1415
0
        self.shared.ref_count_rx.load(Relaxed)
1416
0
    }
1417
1418
    /// Returns the number of senders that currently exist.
1419
    ///
1420
    /// # Examples
1421
    ///
1422
    /// ```
1423
    /// use tokio::sync::watch;
1424
    ///
1425
    /// # #[tokio::main(flavor = "current_thread")]
1426
    /// # async fn main() {
1427
    /// let (tx1, rx) = watch::channel("hello");
1428
    ///
1429
    /// assert_eq!(1, tx1.sender_count());
1430
    ///
1431
    /// let tx2 = tx1.clone();
1432
    ///
1433
    /// assert_eq!(2, tx1.sender_count());
1434
    /// assert_eq!(2, tx2.sender_count());
1435
    /// # }
1436
    /// ```
1437
0
    pub fn sender_count(&self) -> usize {
1438
0
        self.shared.ref_count_tx.load(Relaxed)
1439
0
    }
1440
1441
    /// Returns `true` if senders belong to the same channel.
1442
    ///
1443
    /// # Examples
1444
    ///
1445
    /// ```
1446
    /// let (tx, rx) = tokio::sync::watch::channel(true);
1447
    /// let tx2 = tx.clone();
1448
    /// assert!(tx.same_channel(&tx2));
1449
    ///
1450
    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1451
    /// assert!(!tx3.same_channel(&tx2));
1452
    /// ```
1453
0
    pub fn same_channel(&self, other: &Self) -> bool {
1454
0
        Arc::ptr_eq(&self.shared, &other.shared)
1455
0
    }
1456
}
1457
1458
impl<T> Drop for Sender<T> {
1459
0
    fn drop(&mut self) {
1460
0
        if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1461
0
            self.shared.state.set_closed();
1462
0
            self.shared.notify_rx.notify_waiters();
1463
0
        }
1464
0
    }
Unexecuted instantiation: <tokio::sync::watch::Sender<usize> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <tokio::sync::watch::Sender<()> as core::ops::drop::Drop>::drop
1465
}
1466
1467
// ===== impl Ref =====
1468
1469
impl<T> ops::Deref for Ref<'_, T> {
1470
    type Target = T;
1471
1472
0
    fn deref(&self) -> &T {
1473
0
        self.inner.deref()
1474
0
    }
1475
}
1476
1477
#[cfg(all(test, loom))]
1478
mod tests {
1479
    use futures::future::FutureExt;
1480
    use loom::thread;
1481
1482
    // test for https://github.com/tokio-rs/tokio/issues/3168
1483
    #[test]
1484
    fn watch_spurious_wakeup() {
1485
        loom::model(|| {
1486
            let (send, mut recv) = crate::sync::watch::channel(0i32);
1487
1488
            send.send(1).unwrap();
1489
1490
            let send_thread = thread::spawn(move || {
1491
                send.send(2).unwrap();
1492
                send
1493
            });
1494
1495
            recv.changed().now_or_never();
1496
1497
            let send = send_thread.join().unwrap();
1498
            let recv_thread = thread::spawn(move || {
1499
                recv.changed().now_or_never();
1500
                recv.changed().now_or_never();
1501
                recv
1502
            });
1503
1504
            send.send(3).unwrap();
1505
1506
            let mut recv = recv_thread.join().unwrap();
1507
            let send_thread = thread::spawn(move || {
1508
                send.send(2).unwrap();
1509
            });
1510
1511
            recv.changed().now_or_never();
1512
1513
            send_thread.join().unwrap();
1514
        });
1515
    }
1516
1517
    #[test]
1518
    fn watch_borrow() {
1519
        loom::model(|| {
1520
            let (send, mut recv) = crate::sync::watch::channel(0i32);
1521
1522
            assert!(send.borrow().eq(&0));
1523
            assert!(recv.borrow().eq(&0));
1524
1525
            send.send(1).unwrap();
1526
            assert!(send.borrow().eq(&1));
1527
1528
            let send_thread = thread::spawn(move || {
1529
                send.send(2).unwrap();
1530
                send
1531
            });
1532
1533
            recv.changed().now_or_never();
1534
1535
            let send = send_thread.join().unwrap();
1536
            let recv_thread = thread::spawn(move || {
1537
                recv.changed().now_or_never();
1538
                recv.changed().now_or_never();
1539
                recv
1540
            });
1541
1542
            send.send(3).unwrap();
1543
1544
            let recv = recv_thread.join().unwrap();
1545
            assert!(recv.borrow().eq(&3));
1546
            assert!(send.borrow().eq(&3));
1547
1548
            send.send(2).unwrap();
1549
1550
            thread::spawn(move || {
1551
                assert!(recv.borrow().eq(&2));
1552
            });
1553
            assert!(send.borrow().eq(&2));
1554
        });
1555
    }
1556
}