Coverage Report

Created: 2025-12-31 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/sync/broadcast.rs
Line
Count
Source
1
//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2
//! all consumers.
3
//!
4
//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5
//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6
//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7
//! long as `T` is `Send`.
8
//!
9
//! When a value is sent, **all** [`Receiver`] handles are notified and will
10
//! receive the value. The value is stored once inside the channel and cloned on
11
//! demand for each receiver. Once all receivers have received a clone of the
12
//! value, the value is released from the channel.
13
//!
14
//! A channel is created by calling [`channel`], specifying the maximum number
15
//! of messages the channel can retain at any given time.
16
//!
17
//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18
//! returned [`Receiver`] will receive values sent **after** the call to
19
//! `subscribe`.
20
//!
21
//! This channel is also suitable for the single-producer multi-consumer
22
//! use-case, where a single sender broadcasts values to many receivers.
23
//!
24
//! ## Lagging
25
//!
26
//! As sent messages must be retained until **all** [`Receiver`] handles receive
27
//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28
//! In this case, all but one receiver are able to receive values at the rate
29
//! they are sent. Because one receiver is stalled, the channel starts to fill
30
//! up.
31
//!
32
//! This broadcast channel implementation handles this case by setting a hard
33
//! upper bound on the number of values the channel may retain at any given
34
//! time. This upper bound is passed to the [`channel`] function as an argument.
35
//!
36
//! If a value is sent when the channel is at capacity, the oldest value
37
//! currently held by the channel is released. This frees up space for the new
38
//! value. Any receiver that has not yet seen the released value will return
39
//! [`RecvError::Lagged`] the next time [`recv`] is called.
40
//!
41
//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42
//! updated to the oldest value contained by the channel. The next call to
43
//! [`recv`] will return this value.
44
//!
45
//! This behavior enables a receiver to detect when it has lagged so far behind
46
//! that data has been dropped. The caller may decide how to respond to this:
47
//! either by aborting its task or by tolerating lost messages and resuming
48
//! consumption of the channel.
49
//!
50
//! ## Closing
51
//!
52
//! When **all** [`Sender`] handles have been dropped, no new values may be
53
//! sent. At this point, the channel is "closed". Once a receiver has received
54
//! all values retained by the channel, the next call to [`recv`] will return
55
//! with [`RecvError::Closed`].
56
//!
57
//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58
//! will be marked as read. If this receiver was the only one not to have read
59
//! that message, the message will be dropped at this point.
60
//!
61
//! [`Sender`]: crate::sync::broadcast::Sender
62
//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63
//! [`Receiver`]: crate::sync::broadcast::Receiver
64
//! [`channel`]: crate::sync::broadcast::channel
65
//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66
//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67
//! [`recv`]: crate::sync::broadcast::Receiver::recv
68
//!
69
//! # Examples
70
//!
71
//! Basic usage
72
//!
73
//! ```
74
//! use tokio::sync::broadcast;
75
//!
76
//! # #[tokio::main(flavor = "current_thread")]
77
//! # async fn main() {
78
//! let (tx, mut rx1) = broadcast::channel(16);
79
//! let mut rx2 = tx.subscribe();
80
//!
81
//! tokio::spawn(async move {
82
//!     assert_eq!(rx1.recv().await.unwrap(), 10);
83
//!     assert_eq!(rx1.recv().await.unwrap(), 20);
84
//! });
85
//!
86
//! tokio::spawn(async move {
87
//!     assert_eq!(rx2.recv().await.unwrap(), 10);
88
//!     assert_eq!(rx2.recv().await.unwrap(), 20);
89
//! });
90
//!
91
//! tx.send(10).unwrap();
92
//! tx.send(20).unwrap();
93
//! # }
94
//! ```
95
//!
96
//! Handling lag
97
//!
98
//! ```
99
//! use tokio::sync::broadcast;
100
//!
101
//! # #[tokio::main(flavor = "current_thread")]
102
//! # async fn main() {
103
//! let (tx, mut rx) = broadcast::channel(2);
104
//!
105
//! tx.send(10).unwrap();
106
//! tx.send(20).unwrap();
107
//! tx.send(30).unwrap();
108
//!
109
//! // The receiver lagged behind
110
//! assert!(rx.recv().await.is_err());
111
//!
112
//! // At this point, we can abort or continue with lost messages
113
//!
114
//! assert_eq!(20, rx.recv().await.unwrap());
115
//! assert_eq!(30, rx.recv().await.unwrap());
116
//! # }
117
//! ```
118
119
use crate::loom::cell::UnsafeCell;
120
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121
use crate::loom::sync::{Arc, Mutex, MutexGuard};
122
use crate::task::coop::cooperative;
123
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124
use crate::util::WakeList;
125
126
use std::fmt;
127
use std::future::Future;
128
use std::marker::PhantomPinned;
129
use std::pin::Pin;
130
use std::ptr::NonNull;
131
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132
use std::task::{ready, Context, Poll, Waker};
133
134
/// Sending-half of the [`broadcast`] channel.
135
///
136
/// May be used from many threads. Messages can be sent with
137
/// [`send`][Sender::send].
138
///
139
/// # Examples
140
///
141
/// ```
142
/// use tokio::sync::broadcast;
143
///
144
/// # #[tokio::main(flavor = "current_thread")]
145
/// # async fn main() {
146
/// let (tx, mut rx1) = broadcast::channel(16);
147
/// let mut rx2 = tx.subscribe();
148
///
149
/// tokio::spawn(async move {
150
///     assert_eq!(rx1.recv().await.unwrap(), 10);
151
///     assert_eq!(rx1.recv().await.unwrap(), 20);
152
/// });
153
///
154
/// tokio::spawn(async move {
155
///     assert_eq!(rx2.recv().await.unwrap(), 10);
156
///     assert_eq!(rx2.recv().await.unwrap(), 20);
157
/// });
158
///
159
/// tx.send(10).unwrap();
160
/// tx.send(20).unwrap();
161
/// # }
162
/// ```
163
///
164
/// [`broadcast`]: crate::sync::broadcast
165
pub struct Sender<T> {
166
    shared: Arc<Shared<T>>,
167
}
168
169
/// A sender that does not prevent the channel from being closed.
170
///
171
/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172
/// instances remain, the channel is closed.
173
///
174
/// In order to send messages, the `WeakSender` needs to be upgraded using
175
/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176
/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177
///
178
/// [`Sender`]: Sender
179
/// [`WeakSender::upgrade`]: WeakSender::upgrade
180
///
181
/// # Examples
182
///
183
/// ```
184
/// use tokio::sync::broadcast::channel;
185
///
186
/// # #[tokio::main(flavor = "current_thread")]
187
/// # async fn main() {
188
/// let (tx, _rx) = channel::<i32>(15);
189
/// let tx_weak = tx.downgrade();
190
///
191
/// // Upgrading will succeed because `tx` still exists.
192
/// assert!(tx_weak.upgrade().is_some());
193
///
194
/// // If we drop `tx`, then it will fail.
195
/// drop(tx);
196
/// assert!(tx_weak.clone().upgrade().is_none());
197
/// # }
198
/// ```
199
pub struct WeakSender<T> {
200
    shared: Arc<Shared<T>>,
201
}
202
203
/// Receiving-half of the [`broadcast`] channel.
204
///
205
/// Must not be used concurrently. Messages may be retrieved using
206
/// [`recv`][Receiver::recv].
207
///
208
/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209
/// wrapper.
210
///
211
/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212
///
213
/// # Examples
214
///
215
/// ```
216
/// use tokio::sync::broadcast;
217
///
218
/// # #[tokio::main(flavor = "current_thread")]
219
/// # async fn main() {
220
/// let (tx, mut rx1) = broadcast::channel(16);
221
/// let mut rx2 = tx.subscribe();
222
///
223
/// tokio::spawn(async move {
224
///     assert_eq!(rx1.recv().await.unwrap(), 10);
225
///     assert_eq!(rx1.recv().await.unwrap(), 20);
226
/// });
227
///
228
/// tokio::spawn(async move {
229
///     assert_eq!(rx2.recv().await.unwrap(), 10);
230
///     assert_eq!(rx2.recv().await.unwrap(), 20);
231
/// });
232
///
233
/// tx.send(10).unwrap();
234
/// tx.send(20).unwrap();
235
/// # }
236
/// ```
237
///
238
/// [`broadcast`]: crate::sync::broadcast
239
pub struct Receiver<T> {
240
    /// State shared with all receivers and senders.
241
    shared: Arc<Shared<T>>,
242
243
    /// Next position to read from
244
    next: u64,
245
}
246
247
pub mod error {
248
    //! Broadcast error types
249
250
    use std::fmt;
251
252
    /// Error returned by the [`send`] function on a [`Sender`].
253
    ///
254
    /// A **send** operation can only fail if there are no active receivers,
255
    /// implying that the message could never be received. The error contains the
256
    /// message being sent as a payload so it can be recovered.
257
    ///
258
    /// [`send`]: crate::sync::broadcast::Sender::send
259
    /// [`Sender`]: crate::sync::broadcast::Sender
260
    #[derive(Debug)]
261
    pub struct SendError<T>(pub T);
262
263
    impl<T> fmt::Display for SendError<T> {
264
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265
0
            write!(f, "channel closed")
266
0
        }
267
    }
268
269
    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271
    /// An error returned from the [`recv`] function on a [`Receiver`].
272
    ///
273
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
274
    /// [`Receiver`]: crate::sync::broadcast::Receiver
275
    #[derive(Debug, PartialEq, Eq, Clone)]
276
    pub enum RecvError {
277
        /// There are no more active senders implying no further messages will ever
278
        /// be sent.
279
        Closed,
280
281
        /// The receiver lagged too far behind. Attempting to receive again will
282
        /// return the oldest message still retained by the channel.
283
        ///
284
        /// Includes the number of skipped messages.
285
        Lagged(u64),
286
    }
287
288
    impl fmt::Display for RecvError {
289
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290
0
            match self {
291
0
                RecvError::Closed => write!(f, "channel closed"),
292
0
                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293
            }
294
0
        }
295
    }
296
297
    impl std::error::Error for RecvError {}
298
299
    /// An error returned from the [`try_recv`] function on a [`Receiver`].
300
    ///
301
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302
    /// [`Receiver`]: crate::sync::broadcast::Receiver
303
    #[derive(Debug, PartialEq, Eq, Clone)]
304
    pub enum TryRecvError {
305
        /// The channel is currently empty. There are still active
306
        /// [`Sender`] handles, so data may yet become available.
307
        ///
308
        /// [`Sender`]: crate::sync::broadcast::Sender
309
        Empty,
310
311
        /// There are no more active senders implying no further messages will ever
312
        /// be sent.
313
        Closed,
314
315
        /// The receiver lagged too far behind and has been forcibly disconnected.
316
        /// Attempting to receive again will return the oldest message still
317
        /// retained by the channel.
318
        ///
319
        /// Includes the number of skipped messages.
320
        Lagged(u64),
321
    }
322
323
    impl fmt::Display for TryRecvError {
324
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325
0
            match self {
326
0
                TryRecvError::Empty => write!(f, "channel empty"),
327
0
                TryRecvError::Closed => write!(f, "channel closed"),
328
0
                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329
            }
330
0
        }
331
    }
332
333
    impl std::error::Error for TryRecvError {}
334
}
335
336
use self::error::{RecvError, SendError, TryRecvError};
337
338
use super::Notify;
339
340
/// Data shared between senders and receivers.
341
struct Shared<T> {
342
    /// slots in the channel.
343
    buffer: Box<[Mutex<Slot<T>>]>,
344
345
    /// Mask a position -> index.
346
    mask: usize,
347
348
    /// Tail of the queue. Includes the rx wait list.
349
    tail: Mutex<Tail>,
350
351
    /// Number of outstanding Sender handles.
352
    num_tx: AtomicUsize,
353
354
    /// Number of outstanding weak Sender handles.
355
    num_weak_tx: AtomicUsize,
356
357
    /// Notify when the last subscribed [`Receiver`] drops.
358
    notify_last_rx_drop: Notify,
359
}
360
361
/// Next position to write a value.
362
struct Tail {
363
    /// Next position to write to.
364
    pos: u64,
365
366
    /// Number of active receivers.
367
    rx_cnt: usize,
368
369
    /// True if the channel is closed.
370
    closed: bool,
371
372
    /// Receivers waiting for a value.
373
    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374
}
375
376
/// Slot in the buffer.
377
struct Slot<T> {
378
    /// Remaining number of receivers that are expected to see this value.
379
    ///
380
    /// When this goes to zero, the value is released.
381
    ///
382
    /// An atomic is used as it is mutated concurrently with the slot read lock
383
    /// acquired.
384
    rem: AtomicUsize,
385
386
    /// Uniquely identifies the `send` stored in the slot.
387
    pos: u64,
388
389
    /// The value being broadcast.
390
    ///
391
    /// The value is set by `send` when the write lock is held. When a reader
392
    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393
    val: Option<T>,
394
}
395
396
/// An entry in the wait queue.
397
struct Waiter {
398
    /// True if queued.
399
    queued: AtomicBool,
400
401
    /// Task waiting on the broadcast channel.
402
    waker: Option<Waker>,
403
404
    /// Intrusive linked-list pointers.
405
    pointers: linked_list::Pointers<Waiter>,
406
407
    /// Should not be `Unpin`.
408
    _p: PhantomPinned,
409
}
410
411
impl Waiter {
412
0
    fn new() -> Self {
413
0
        Self {
414
0
            queued: AtomicBool::new(false),
415
0
            waker: None,
416
0
            pointers: linked_list::Pointers::new(),
417
0
            _p: PhantomPinned,
418
0
        }
419
0
    }
420
}
421
422
generate_addr_of_methods! {
423
    impl<> Waiter {
424
        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425
            &self.pointers
426
        }
427
    }
428
}
429
430
struct RecvGuard<'a, T> {
431
    slot: MutexGuard<'a, Slot<T>>,
432
}
433
434
/// Receive a value future.
435
struct Recv<'a, T> {
436
    /// Receiver being waited on.
437
    receiver: &'a mut Receiver<T>,
438
439
    /// Entry in the waiter `LinkedList`.
440
    waiter: WaiterCell,
441
}
442
443
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
444
// from `Recv`.
445
struct WaiterCell(UnsafeCell<Waiter>);
446
447
unsafe impl Send for WaiterCell {}
448
unsafe impl Sync for WaiterCell {}
449
450
/// Max number of receivers. Reserve space to lock.
451
const MAX_RECEIVERS: usize = usize::MAX >> 2;
452
453
/// Create a bounded, multi-producer, multi-consumer channel where each sent
454
/// value is broadcasted to all active receivers.
455
///
456
/// **Note:** The actual capacity may be greater than the provided `capacity`.
457
///
458
/// All data sent on [`Sender`] will become available on every active
459
/// [`Receiver`] in the same order as it was sent.
460
///
461
/// The `Sender` can be cloned to `send` to the same channel from multiple
462
/// points in the process or it can be used concurrently from an `Arc`. New
463
/// `Receiver` handles are created by calling [`Sender::subscribe`].
464
///
465
/// If all [`Receiver`] handles are dropped, the `send` method will return a
466
/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
467
/// method will return a [`RecvError`].
468
///
469
/// [`Sender`]: crate::sync::broadcast::Sender
470
/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
471
/// [`Receiver`]: crate::sync::broadcast::Receiver
472
/// [`recv`]: crate::sync::broadcast::Receiver::recv
473
/// [`SendError`]: crate::sync::broadcast::error::SendError
474
/// [`RecvError`]: crate::sync::broadcast::error::RecvError
475
///
476
/// # Examples
477
///
478
/// ```
479
/// use tokio::sync::broadcast;
480
///
481
/// # #[tokio::main(flavor = "current_thread")]
482
/// # async fn main() {
483
/// let (tx, mut rx1) = broadcast::channel(16);
484
/// let mut rx2 = tx.subscribe();
485
///
486
/// tokio::spawn(async move {
487
///     assert_eq!(rx1.recv().await.unwrap(), 10);
488
///     assert_eq!(rx1.recv().await.unwrap(), 20);
489
/// });
490
///
491
/// tokio::spawn(async move {
492
///     assert_eq!(rx2.recv().await.unwrap(), 10);
493
///     assert_eq!(rx2.recv().await.unwrap(), 20);
494
/// });
495
///
496
/// tx.send(10).unwrap();
497
/// tx.send(20).unwrap();
498
/// # }
499
/// ```
500
///
501
/// # Panics
502
///
503
/// This will panic if `capacity` is equal to `0`.
504
///
505
/// This pre-allocates space for `capacity` messages. Allocation failure may result in a panic or
506
/// [an allocation error](std::alloc::handle_alloc_error).
507
#[track_caller]
508
0
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
509
    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
510
0
    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
511
0
    let rx = Receiver {
512
0
        shared: tx.shared.clone(),
513
0
        next: 0,
514
0
    };
515
0
    (tx, rx)
516
0
}
517
518
impl<T> Sender<T> {
519
    /// Creates the sending-half of the [`broadcast`] channel.
520
    ///
521
    /// See the documentation of [`broadcast::channel`] for more information on this method.
522
    ///
523
    /// [`broadcast`]: crate::sync::broadcast
524
    /// [`broadcast::channel`]: crate::sync::broadcast::channel
525
    #[track_caller]
526
0
    pub fn new(capacity: usize) -> Self {
527
        // SAFETY: We don't create extra receivers, so there are 0.
528
0
        unsafe { Self::new_with_receiver_count(0, capacity) }
529
0
    }
530
531
    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
532
    /// count.
533
    ///
534
    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
535
    /// calling this function.
536
    ///
537
    /// # Safety:
538
    ///
539
    /// The caller must ensure that the amount of receivers for this Sender is correct before
540
    /// the channel functionalities are used, the count is zero by default, as this function
541
    /// does not create any receivers by itself.
542
    #[track_caller]
543
0
    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
544
0
        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
545
0
        assert!(
546
0
            capacity <= usize::MAX >> 1,
547
0
            "broadcast channel capacity exceeded `usize::MAX / 2`"
548
        );
549
550
        // Round to a power of two
551
0
        capacity = capacity.next_power_of_two();
552
553
0
        let mut buffer = Vec::with_capacity(capacity);
554
555
0
        for i in 0..capacity {
556
0
            buffer.push(Mutex::new(Slot {
557
0
                rem: AtomicUsize::new(0),
558
0
                pos: (i as u64).wrapping_sub(capacity as u64),
559
0
                val: None,
560
0
            }));
561
0
        }
562
563
0
        let shared = Arc::new(Shared {
564
0
            buffer: buffer.into_boxed_slice(),
565
0
            mask: capacity - 1,
566
0
            tail: Mutex::new(Tail {
567
0
                pos: 0,
568
0
                rx_cnt: receiver_count,
569
0
                closed: receiver_count == 0,
570
0
                waiters: LinkedList::new(),
571
0
            }),
572
0
            num_tx: AtomicUsize::new(1),
573
0
            num_weak_tx: AtomicUsize::new(0),
574
0
            notify_last_rx_drop: Notify::new(),
575
0
        });
576
577
0
        Sender { shared }
578
0
    }
579
580
    /// Attempts to send a value to all active [`Receiver`] handles, returning
581
    /// it back if it could not be sent.
582
    ///
583
    /// A successful send occurs when there is at least one active [`Receiver`]
584
    /// handle. An unsuccessful send would be one where all associated
585
    /// [`Receiver`] handles have already been dropped.
586
    ///
587
    /// # Return
588
    ///
589
    /// On success, the number of subscribed [`Receiver`] handles is returned.
590
    /// This does not mean that this number of receivers will see the message as
591
    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
592
    /// the message.
593
    ///
594
    /// # Note
595
    ///
596
    /// A return value of `Ok` **does not** mean that the sent value will be
597
    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
598
    /// handles may be dropped before receiving the sent message.
599
    ///
600
    /// A return value of `Err` **does not** mean that future calls to `send`
601
    /// will fail. New [`Receiver`] handles may be created by calling
602
    /// [`subscribe`].
603
    ///
604
    /// [`Receiver`]: crate::sync::broadcast::Receiver
605
    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
606
    ///
607
    /// # Examples
608
    ///
609
    /// ```
610
    /// use tokio::sync::broadcast;
611
    ///
612
    /// # #[tokio::main(flavor = "current_thread")]
613
    /// # async fn main() {
614
    /// let (tx, mut rx1) = broadcast::channel(16);
615
    /// let mut rx2 = tx.subscribe();
616
    ///
617
    /// tokio::spawn(async move {
618
    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
619
    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
620
    /// });
621
    ///
622
    /// tokio::spawn(async move {
623
    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
624
    ///     assert_eq!(rx2.recv().await.unwrap(), 20);
625
    /// });
626
    ///
627
    /// tx.send(10).unwrap();
628
    /// tx.send(20).unwrap();
629
    /// # }
630
    /// ```
631
0
    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
632
0
        let mut tail = self.shared.tail.lock();
633
634
0
        if tail.rx_cnt == 0 {
635
0
            return Err(SendError(value));
636
0
        }
637
638
        // Position to write into
639
0
        let pos = tail.pos;
640
0
        let rem = tail.rx_cnt;
641
0
        let idx = (pos & self.shared.mask as u64) as usize;
642
643
        // Update the tail position
644
0
        tail.pos = tail.pos.wrapping_add(1);
645
646
        // Get the slot
647
0
        let mut slot = self.shared.buffer[idx].lock();
648
649
        // Track the position
650
0
        slot.pos = pos;
651
652
        // Set remaining receivers
653
0
        slot.rem.with_mut(|v| *v = rem);
654
655
        // Write the value
656
0
        slot.val = Some(value);
657
658
        // Release the slot lock before notifying the receivers.
659
0
        drop(slot);
660
661
        // Notify and release the mutex. This must happen after the slot lock is
662
        // released, otherwise the writer lock bit could be cleared while another
663
        // thread is in the critical section.
664
0
        self.shared.notify_rx(tail);
665
666
0
        Ok(rem)
667
0
    }
668
669
    /// Creates a new [`Receiver`] handle that will receive values sent **after**
670
    /// this call to `subscribe`.
671
    ///
672
    /// # Examples
673
    ///
674
    /// ```
675
    /// use tokio::sync::broadcast;
676
    ///
677
    /// # #[tokio::main(flavor = "current_thread")]
678
    /// # async fn main() {
679
    /// let (tx, _rx) = broadcast::channel(16);
680
    ///
681
    /// // Will not be seen
682
    /// tx.send(10).unwrap();
683
    ///
684
    /// let mut rx = tx.subscribe();
685
    ///
686
    /// tx.send(20).unwrap();
687
    ///
688
    /// let value = rx.recv().await.unwrap();
689
    /// assert_eq!(20, value);
690
    /// # }
691
    /// ```
692
0
    pub fn subscribe(&self) -> Receiver<T> {
693
0
        let shared = self.shared.clone();
694
0
        new_receiver(shared)
695
0
    }
696
697
    /// Converts the `Sender` to a [`WeakSender`] that does not count
698
    /// towards RAII semantics, i.e. if all `Sender` instances of the
699
    /// channel were dropped and only `WeakSender` instances remain,
700
    /// the channel is closed.
701
    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
702
0
    pub fn downgrade(&self) -> WeakSender<T> {
703
0
        self.shared.num_weak_tx.fetch_add(1, Relaxed);
704
0
        WeakSender {
705
0
            shared: self.shared.clone(),
706
0
        }
707
0
    }
708
709
    /// Returns the number of queued values.
710
    ///
711
    /// A value is queued until it has either been seen by all receivers that were alive at the time
712
    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
713
    /// queue's capacity.
714
    ///
715
    /// # Note
716
    ///
717
    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
718
    /// have been evicted from the queue before being seen by all receivers.
719
    ///
720
    /// # Examples
721
    ///
722
    /// ```
723
    /// use tokio::sync::broadcast;
724
    ///
725
    /// # #[tokio::main(flavor = "current_thread")]
726
    /// # async fn main() {
727
    /// let (tx, mut rx1) = broadcast::channel(16);
728
    /// let mut rx2 = tx.subscribe();
729
    ///
730
    /// tx.send(10).unwrap();
731
    /// tx.send(20).unwrap();
732
    /// tx.send(30).unwrap();
733
    ///
734
    /// assert_eq!(tx.len(), 3);
735
    ///
736
    /// rx1.recv().await.unwrap();
737
    ///
738
    /// // The len is still 3 since rx2 hasn't seen the first value yet.
739
    /// assert_eq!(tx.len(), 3);
740
    ///
741
    /// rx2.recv().await.unwrap();
742
    ///
743
    /// assert_eq!(tx.len(), 2);
744
    /// # }
745
    /// ```
746
0
    pub fn len(&self) -> usize {
747
0
        let tail = self.shared.tail.lock();
748
749
0
        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
750
0
        let mut low = 0;
751
0
        let mut high = self.shared.buffer.len();
752
0
        while low < high {
753
0
            let mid = low + (high - low) / 2;
754
0
            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
755
0
            if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
756
0
                low = mid + 1;
757
0
            } else {
758
0
                high = mid;
759
0
            }
760
        }
761
762
0
        self.shared.buffer.len() - low
763
0
    }
764
765
    /// Returns true if there are no queued values.
766
    ///
767
    /// # Examples
768
    ///
769
    /// ```
770
    /// use tokio::sync::broadcast;
771
    ///
772
    /// # #[tokio::main(flavor = "current_thread")]
773
    /// # async fn main() {
774
    /// let (tx, mut rx1) = broadcast::channel(16);
775
    /// let mut rx2 = tx.subscribe();
776
    ///
777
    /// assert!(tx.is_empty());
778
    ///
779
    /// tx.send(10).unwrap();
780
    ///
781
    /// assert!(!tx.is_empty());
782
    ///
783
    /// rx1.recv().await.unwrap();
784
    ///
785
    /// // The queue is still not empty since rx2 hasn't seen the value.
786
    /// assert!(!tx.is_empty());
787
    ///
788
    /// rx2.recv().await.unwrap();
789
    ///
790
    /// assert!(tx.is_empty());
791
    /// # }
792
    /// ```
793
0
    pub fn is_empty(&self) -> bool {
794
0
        let tail = self.shared.tail.lock();
795
796
0
        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
797
0
        self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
798
0
    }
799
800
    /// Returns the number of active receivers.
801
    ///
802
    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
803
    /// [`subscribe`]. These are the handles that will receive values sent on
804
    /// this [`Sender`].
805
    ///
806
    /// # Note
807
    ///
808
    /// It is not guaranteed that a sent message will reach this number of
809
    /// receivers. Active receivers may never call [`recv`] again before
810
    /// dropping.
811
    ///
812
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
813
    /// [`Receiver`]: crate::sync::broadcast::Receiver
814
    /// [`Sender`]: crate::sync::broadcast::Sender
815
    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
816
    /// [`channel`]: crate::sync::broadcast::channel
817
    ///
818
    /// # Examples
819
    ///
820
    /// ```
821
    /// use tokio::sync::broadcast;
822
    ///
823
    /// # #[tokio::main(flavor = "current_thread")]
824
    /// # async fn main() {
825
    /// let (tx, _rx1) = broadcast::channel(16);
826
    ///
827
    /// assert_eq!(1, tx.receiver_count());
828
    ///
829
    /// let mut _rx2 = tx.subscribe();
830
    ///
831
    /// assert_eq!(2, tx.receiver_count());
832
    ///
833
    /// tx.send(10).unwrap();
834
    /// # }
835
    /// ```
836
0
    pub fn receiver_count(&self) -> usize {
837
0
        let tail = self.shared.tail.lock();
838
0
        tail.rx_cnt
839
0
    }
840
841
    /// Returns `true` if senders belong to the same channel.
842
    ///
843
    /// # Examples
844
    ///
845
    /// ```
846
    /// use tokio::sync::broadcast;
847
    ///
848
    /// # #[tokio::main(flavor = "current_thread")]
849
    /// # async fn main() {
850
    /// let (tx, _rx) = broadcast::channel::<()>(16);
851
    /// let tx2 = tx.clone();
852
    ///
853
    /// assert!(tx.same_channel(&tx2));
854
    ///
855
    /// let (tx3, _rx3) = broadcast::channel::<()>(16);
856
    ///
857
    /// assert!(!tx3.same_channel(&tx2));
858
    /// # }
859
    /// ```
860
0
    pub fn same_channel(&self, other: &Self) -> bool {
861
0
        Arc::ptr_eq(&self.shared, &other.shared)
862
0
    }
863
864
    /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
865
    /// zero.
866
    ///
867
    /// # Examples
868
    ///
869
    /// ```
870
    /// use futures::FutureExt;
871
    /// use tokio::sync::broadcast;
872
    ///
873
    /// # #[tokio::main(flavor = "current_thread")]
874
    /// # async fn main() {
875
    /// let (tx, mut rx1) = broadcast::channel::<u32>(16);
876
    /// let mut rx2 = tx.subscribe();
877
    ///
878
    /// let _ = tx.send(10);
879
    ///
880
    /// assert_eq!(rx1.recv().await.unwrap(), 10);
881
    /// drop(rx1);
882
    /// assert!(tx.closed().now_or_never().is_none());
883
    ///
884
    /// assert_eq!(rx2.recv().await.unwrap(), 10);
885
    /// drop(rx2);
886
    /// assert!(tx.closed().now_or_never().is_some());
887
    /// # }
888
    /// ```
889
0
    pub async fn closed(&self) {
890
        loop {
891
0
            let notified = self.shared.notify_last_rx_drop.notified();
892
893
            {
894
                // Ensure the lock drops if the channel isn't closed
895
0
                let tail = self.shared.tail.lock();
896
0
                if tail.closed {
897
0
                    return;
898
0
                }
899
            }
900
901
0
            notified.await;
902
        }
903
0
    }
904
905
0
    fn close_channel(&self) {
906
0
        let mut tail = self.shared.tail.lock();
907
0
        tail.closed = true;
908
909
0
        self.shared.notify_rx(tail);
910
0
    }
911
912
    /// Returns the number of [`Sender`] handles.
913
0
    pub fn strong_count(&self) -> usize {
914
0
        self.shared.num_tx.load(Acquire)
915
0
    }
916
917
    /// Returns the number of [`WeakSender`] handles.
918
0
    pub fn weak_count(&self) -> usize {
919
0
        self.shared.num_weak_tx.load(Acquire)
920
0
    }
921
}
922
923
/// Create a new `Receiver` which reads starting from the tail.
924
0
fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
925
0
    let mut tail = shared.tail.lock();
926
927
0
    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
928
929
0
    if tail.rx_cnt == 0 {
930
0
        // Potentially need to re-open the channel, if a new receiver has been added between calls
931
0
        // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
932
0
        // applies if the sender has been dropped
933
0
        tail.closed = false;
934
0
    }
935
936
0
    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
937
0
    let next = tail.pos;
938
939
0
    drop(tail);
940
941
0
    Receiver { shared, next }
942
0
}
943
944
/// List used in `Shared::notify_rx`. It wraps a guarded linked list
945
/// and gates the access to it on the `Shared.tail` mutex. It also empties
946
/// the list on drop.
947
struct WaitersList<'a, T> {
948
    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
949
    is_empty: bool,
950
    shared: &'a Shared<T>,
951
}
952
953
impl<'a, T> Drop for WaitersList<'a, T> {
954
0
    fn drop(&mut self) {
955
        // If the list is not empty, we unlink all waiters from it.
956
        // We do not wake the waiters to avoid double panics.
957
0
        if !self.is_empty {
958
0
            let _lock_guard = self.shared.tail.lock();
959
0
            while self.list.pop_back().is_some() {}
960
0
        }
961
0
    }
962
}
963
964
impl<'a, T> WaitersList<'a, T> {
965
0
    fn new(
966
0
        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
967
0
        guard: Pin<&'a Waiter>,
968
0
        shared: &'a Shared<T>,
969
0
    ) -> Self {
970
0
        let guard_ptr = NonNull::from(guard.get_ref());
971
0
        let list = unguarded_list.into_guarded(guard_ptr);
972
0
        WaitersList {
973
0
            list,
974
0
            is_empty: false,
975
0
            shared,
976
0
        }
977
0
    }
978
979
    /// Removes the last element from the guarded list. Modifying this list
980
    /// requires an exclusive access to the main list in `Notify`.
981
0
    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
982
0
        let result = self.list.pop_back();
983
0
        if result.is_none() {
984
0
            // Save information about emptiness to avoid waiting for lock
985
0
            // in the destructor.
986
0
            self.is_empty = true;
987
0
        }
988
0
        result
989
0
    }
990
}
991
992
impl<T> Shared<T> {
993
0
    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
994
        // It is critical for `GuardedLinkedList` safety that the guard node is
995
        // pinned in memory and is not dropped until the guarded list is dropped.
996
0
        let guard = Waiter::new();
997
0
        pin!(guard);
998
999
        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
1000
        // underneath to allow every waiter to safely remove itself from it.
1001
        //
1002
        // * This list will be still guarded by the `waiters` lock.
1003
        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1004
        // * This wrapper will empty the list on drop. It is critical for safety
1005
        //   that we will not leave any list entry with a pointer to the local
1006
        //   guard node after this function returns / panics.
1007
0
        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1008
1009
0
        let mut wakers = WakeList::new();
1010
        'outer: loop {
1011
0
            while wakers.can_push() {
1012
0
                match list.pop_back_locked(&mut tail) {
1013
0
                    Some(waiter) => {
1014
                        unsafe {
1015
                            // Safety: accessing `waker` is safe because
1016
                            // the tail lock is held.
1017
0
                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1018
0
                                wakers.push(waker);
1019
0
                            }
1020
1021
                            // Safety: `queued` is atomic.
1022
0
                            let queued = &(*waiter.as_ptr()).queued;
1023
                            // `Relaxed` suffices because the tail lock is held.
1024
0
                            assert!(queued.load(Relaxed));
1025
                            // `Release` is needed to synchronize with `Recv::drop`.
1026
                            // It is critical to set this variable **after** waker
1027
                            // is extracted, otherwise we may data race with `Recv::drop`.
1028
0
                            queued.store(false, Release);
1029
                        }
1030
                    }
1031
                    None => {
1032
0
                        break 'outer;
1033
                    }
1034
                }
1035
            }
1036
1037
            // Release the lock before waking.
1038
0
            drop(tail);
1039
1040
            // Before we acquire the lock again all sorts of things can happen:
1041
            // some waiters may remove themselves from the list and new waiters
1042
            // may be added. This is fine since at worst we will unnecessarily
1043
            // wake up waiters which will then queue themselves again.
1044
1045
0
            wakers.wake_all();
1046
1047
            // Acquire the lock again.
1048
0
            tail = self.tail.lock();
1049
        }
1050
1051
        // Release the lock before waking.
1052
0
        drop(tail);
1053
1054
0
        wakers.wake_all();
1055
0
    }
1056
}
1057
1058
impl<T> Clone for Sender<T> {
1059
0
    fn clone(&self) -> Sender<T> {
1060
0
        let shared = self.shared.clone();
1061
0
        shared.num_tx.fetch_add(1, Relaxed);
1062
1063
0
        Sender { shared }
1064
0
    }
1065
}
1066
1067
impl<T> Drop for Sender<T> {
1068
0
    fn drop(&mut self) {
1069
0
        if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1070
0
            self.close_channel();
1071
0
        }
1072
0
    }
1073
}
1074
1075
impl<T> WeakSender<T> {
1076
    /// Tries to convert a `WeakSender` into a [`Sender`].
1077
    ///
1078
    /// This will return `Some` if there are other `Sender` instances alive and
1079
    /// the channel wasn't previously dropped, otherwise `None` is returned.
1080
    #[must_use]
1081
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
1082
0
        let mut tx_count = self.shared.num_tx.load(Acquire);
1083
1084
        loop {
1085
0
            if tx_count == 0 {
1086
                // channel is closed so this WeakSender can not be upgraded
1087
0
                return None;
1088
0
            }
1089
1090
0
            match self
1091
0
                .shared
1092
0
                .num_tx
1093
0
                .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1094
            {
1095
                Ok(_) => {
1096
0
                    return Some(Sender {
1097
0
                        shared: self.shared.clone(),
1098
0
                    })
1099
                }
1100
0
                Err(prev_count) => tx_count = prev_count,
1101
            }
1102
        }
1103
0
    }
1104
1105
    /// Returns the number of [`Sender`] handles.
1106
0
    pub fn strong_count(&self) -> usize {
1107
0
        self.shared.num_tx.load(Acquire)
1108
0
    }
1109
1110
    /// Returns the number of [`WeakSender`] handles.
1111
0
    pub fn weak_count(&self) -> usize {
1112
0
        self.shared.num_weak_tx.load(Acquire)
1113
0
    }
1114
}
1115
1116
impl<T> Clone for WeakSender<T> {
1117
0
    fn clone(&self) -> WeakSender<T> {
1118
0
        let shared = self.shared.clone();
1119
0
        shared.num_weak_tx.fetch_add(1, Relaxed);
1120
1121
0
        Self { shared }
1122
0
    }
1123
}
1124
1125
impl<T> Drop for WeakSender<T> {
1126
0
    fn drop(&mut self) {
1127
0
        self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1128
0
    }
1129
}
1130
1131
impl<T> Receiver<T> {
1132
    /// Returns the number of messages that were sent into the channel and that
1133
    /// this [`Receiver`] has yet to receive.
1134
    ///
1135
    /// If the returned value from `len` is larger than the next largest power of 2
1136
    /// of the capacity of the channel any call to [`recv`] will return an
1137
    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1138
    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1139
    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1140
    /// values larger than 16.
1141
    ///
1142
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1143
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1144
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1145
    ///
1146
    /// # Examples
1147
    ///
1148
    /// ```
1149
    /// use tokio::sync::broadcast;
1150
    ///
1151
    /// # #[tokio::main(flavor = "current_thread")]
1152
    /// # async fn main() {
1153
    /// let (tx, mut rx1) = broadcast::channel(16);
1154
    ///
1155
    /// tx.send(10).unwrap();
1156
    /// tx.send(20).unwrap();
1157
    ///
1158
    /// assert_eq!(rx1.len(), 2);
1159
    /// assert_eq!(rx1.recv().await.unwrap(), 10);
1160
    /// assert_eq!(rx1.len(), 1);
1161
    /// assert_eq!(rx1.recv().await.unwrap(), 20);
1162
    /// assert_eq!(rx1.len(), 0);
1163
    /// # }
1164
    /// ```
1165
0
    pub fn len(&self) -> usize {
1166
0
        let next_send_pos = self.shared.tail.lock().pos;
1167
0
        (next_send_pos - self.next) as usize
1168
0
    }
1169
1170
    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1171
    /// has yet to receive.
1172
    ///
1173
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1174
    ///
1175
    /// # Examples
1176
    ///
1177
    /// ```
1178
    /// use tokio::sync::broadcast;
1179
    ///
1180
    /// # #[tokio::main(flavor = "current_thread")]
1181
    /// # async fn main() {
1182
    /// let (tx, mut rx1) = broadcast::channel(16);
1183
    ///
1184
    /// assert!(rx1.is_empty());
1185
    ///
1186
    /// tx.send(10).unwrap();
1187
    /// tx.send(20).unwrap();
1188
    ///
1189
    /// assert!(!rx1.is_empty());
1190
    /// assert_eq!(rx1.recv().await.unwrap(), 10);
1191
    /// assert_eq!(rx1.recv().await.unwrap(), 20);
1192
    /// assert!(rx1.is_empty());
1193
    /// # }
1194
    /// ```
1195
0
    pub fn is_empty(&self) -> bool {
1196
0
        self.len() == 0
1197
0
    }
1198
1199
    /// Returns `true` if receivers belong to the same channel.
1200
    ///
1201
    /// # Examples
1202
    ///
1203
    /// ```
1204
    /// use tokio::sync::broadcast;
1205
    ///
1206
    /// # #[tokio::main(flavor = "current_thread")]
1207
    /// # async fn main() {
1208
    /// let (tx, rx) = broadcast::channel::<()>(16);
1209
    /// let rx2 = tx.subscribe();
1210
    ///
1211
    /// assert!(rx.same_channel(&rx2));
1212
    ///
1213
    /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1214
    ///
1215
    /// assert!(!rx3.same_channel(&rx2));
1216
    /// # }
1217
    /// ```
1218
0
    pub fn same_channel(&self, other: &Self) -> bool {
1219
0
        Arc::ptr_eq(&self.shared, &other.shared)
1220
0
    }
1221
1222
    /// Locks the next value if there is one.
1223
0
    fn recv_ref(
1224
0
        &mut self,
1225
0
        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1226
0
    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1227
0
        let idx = (self.next & self.shared.mask as u64) as usize;
1228
1229
        // The slot holding the next value to read
1230
0
        let mut slot = self.shared.buffer[idx].lock();
1231
1232
0
        if slot.pos != self.next {
1233
            // Release the `slot` lock before attempting to acquire the `tail`
1234
            // lock. This is required because `send2` acquires the tail lock
1235
            // first followed by the slot lock. Acquiring the locks in reverse
1236
            // order here would result in a potential deadlock: `recv_ref`
1237
            // acquires the `slot` lock and attempts to acquire the `tail` lock
1238
            // while `send2` acquired the `tail` lock and attempts to acquire
1239
            // the slot lock.
1240
0
            drop(slot);
1241
1242
0
            let mut old_waker = None;
1243
1244
0
            let mut tail = self.shared.tail.lock();
1245
1246
            // Acquire slot lock again
1247
0
            slot = self.shared.buffer[idx].lock();
1248
1249
            // Make sure the position did not change. This could happen in the
1250
            // unlikely event that the buffer is wrapped between dropping the
1251
            // read lock and acquiring the tail lock.
1252
0
            if slot.pos != self.next {
1253
0
                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1254
1255
0
                if next_pos == self.next {
1256
                    // At this point the channel is empty for *this* receiver. If
1257
                    // it's been closed, then that's what we return, otherwise we
1258
                    // set a waker and return empty.
1259
0
                    if tail.closed {
1260
0
                        return Err(TryRecvError::Closed);
1261
0
                    }
1262
1263
                    // Store the waker
1264
0
                    if let Some((waiter, waker)) = waiter {
1265
                        // Safety: called while locked.
1266
                        unsafe {
1267
                            // Only queue if not already queued
1268
0
                            waiter.with_mut(|ptr| {
1269
                                // If there is no waker **or** if the currently
1270
                                // stored waker references a **different** task,
1271
                                // track the tasks' waker to be notified on
1272
                                // receipt of a new value.
1273
0
                                match (*ptr).waker {
1274
0
                                    Some(ref w) if w.will_wake(waker) => {}
1275
0
                                    _ => {
1276
0
                                        old_waker = (*ptr).waker.replace(waker.clone());
1277
0
                                    }
1278
                                }
1279
1280
                                // If the waiter is not already queued, enqueue it.
1281
                                // `Relaxed` order suffices: we have synchronized with
1282
                                // all writers through the tail lock that we hold.
1283
0
                                if !(*ptr).queued.load(Relaxed) {
1284
0
                                    // `Relaxed` order suffices: all the readers will
1285
0
                                    // synchronize with this write through the tail lock.
1286
0
                                    (*ptr).queued.store(true, Relaxed);
1287
0
                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1288
0
                                }
1289
0
                            });
1290
                        }
1291
0
                    }
1292
1293
                    // Drop the old waker after releasing the locks.
1294
0
                    drop(slot);
1295
0
                    drop(tail);
1296
0
                    drop(old_waker);
1297
1298
0
                    return Err(TryRecvError::Empty);
1299
0
                }
1300
1301
                // At this point, the receiver has lagged behind the sender by
1302
                // more than the channel capacity. The receiver will attempt to
1303
                // catch up by skipping dropped messages and setting the
1304
                // internal cursor to the **oldest** message stored by the
1305
                // channel.
1306
0
                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1307
1308
0
                let missed = next.wrapping_sub(self.next);
1309
1310
0
                drop(tail);
1311
1312
                // The receiver is slow but no values have been missed
1313
0
                if missed == 0 {
1314
0
                    self.next = self.next.wrapping_add(1);
1315
1316
0
                    return Ok(RecvGuard { slot });
1317
0
                }
1318
1319
0
                self.next = next;
1320
1321
0
                return Err(TryRecvError::Lagged(missed));
1322
0
            }
1323
0
        }
1324
1325
0
        self.next = self.next.wrapping_add(1);
1326
1327
0
        Ok(RecvGuard { slot })
1328
0
    }
1329
1330
    /// Returns the number of [`Sender`] handles.
1331
0
    pub fn sender_strong_count(&self) -> usize {
1332
0
        self.shared.num_tx.load(Acquire)
1333
0
    }
1334
1335
    /// Returns the number of [`WeakSender`] handles.
1336
0
    pub fn sender_weak_count(&self) -> usize {
1337
0
        self.shared.num_weak_tx.load(Acquire)
1338
0
    }
1339
1340
    /// Checks if a channel is closed.
1341
    ///
1342
    /// This method returns `true` if the channel has been closed. The channel is closed
1343
    /// when all [`Sender`] have been dropped.
1344
    ///
1345
    /// [`Sender`]: crate::sync::broadcast::Sender
1346
    ///
1347
    /// # Examples
1348
    /// ```
1349
    /// use tokio::sync::broadcast;
1350
    ///
1351
    /// # #[tokio::main(flavor = "current_thread")]
1352
    /// # async fn main() {
1353
    /// let (tx, rx) = broadcast::channel::<()>(10);
1354
    /// assert!(!rx.is_closed());
1355
    ///
1356
    /// drop(tx);
1357
    ///
1358
    /// assert!(rx.is_closed());
1359
    /// # }
1360
    /// ```
1361
0
    pub fn is_closed(&self) -> bool {
1362
        // Channel is closed when there are no strong senders left active
1363
0
        self.shared.num_tx.load(Acquire) == 0
1364
0
    }
1365
}
1366
1367
impl<T: Clone> Receiver<T> {
1368
    /// Re-subscribes to the channel starting from the current tail element.
1369
    ///
1370
    /// This [`Receiver`] handle will receive a clone of all values sent
1371
    /// **after** it has resubscribed. This will not include elements that are
1372
    /// in the queue of the current receiver. Consider the following example.
1373
    ///
1374
    /// # Examples
1375
    ///
1376
    /// ```
1377
    /// use tokio::sync::broadcast;
1378
    ///
1379
    /// # #[tokio::main(flavor = "current_thread")]
1380
    /// # async fn main() {
1381
    /// let (tx, mut rx) = broadcast::channel(2);
1382
    ///
1383
    /// tx.send(1).unwrap();
1384
    /// let mut rx2 = rx.resubscribe();
1385
    /// tx.send(2).unwrap();
1386
    ///
1387
    /// assert_eq!(rx2.recv().await.unwrap(), 2);
1388
    /// assert_eq!(rx.recv().await.unwrap(), 1);
1389
    /// # }
1390
    /// ```
1391
0
    pub fn resubscribe(&self) -> Self {
1392
0
        let shared = self.shared.clone();
1393
0
        new_receiver(shared)
1394
0
    }
1395
    /// Receives the next value for this receiver.
1396
    ///
1397
    /// Each [`Receiver`] handle will receive a clone of all values sent
1398
    /// **after** it has subscribed.
1399
    ///
1400
    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1401
    /// dropped, indicating that no further values can be sent on the channel.
1402
    ///
1403
    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1404
    /// sent values will overwrite old values. At this point, a call to [`recv`]
1405
    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1406
    /// internal cursor is updated to point to the oldest value still held by
1407
    /// the channel. A subsequent call to [`recv`] will return this value
1408
    /// **unless** it has been since overwritten.
1409
    ///
1410
    /// # Cancel safety
1411
    ///
1412
    /// This method is cancel safe. If `recv` is used as the event in a
1413
    /// [`tokio::select!`](crate::select) statement and some other branch
1414
    /// completes first, it is guaranteed that no messages were received on this
1415
    /// channel.
1416
    ///
1417
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1418
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1419
    ///
1420
    /// # Examples
1421
    ///
1422
    /// ```
1423
    /// use tokio::sync::broadcast;
1424
    ///
1425
    /// # #[tokio::main(flavor = "current_thread")]
1426
    /// # async fn main() {
1427
    /// let (tx, mut rx1) = broadcast::channel(16);
1428
    /// let mut rx2 = tx.subscribe();
1429
    ///
1430
    /// tokio::spawn(async move {
1431
    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1432
    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1433
    /// });
1434
    ///
1435
    /// tokio::spawn(async move {
1436
    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
1437
    ///     assert_eq!(rx2.recv().await.unwrap(), 20);
1438
    /// });
1439
    ///
1440
    /// tx.send(10).unwrap();
1441
    /// tx.send(20).unwrap();
1442
    /// # }
1443
    /// ```
1444
    ///
1445
    /// Handling lag
1446
    ///
1447
    /// ```
1448
    /// use tokio::sync::broadcast;
1449
    ///
1450
    /// # #[tokio::main(flavor = "current_thread")]
1451
    /// # async fn main() {
1452
    /// let (tx, mut rx) = broadcast::channel(2);
1453
    ///
1454
    /// tx.send(10).unwrap();
1455
    /// tx.send(20).unwrap();
1456
    /// tx.send(30).unwrap();
1457
    ///
1458
    /// // The receiver lagged behind
1459
    /// assert!(rx.recv().await.is_err());
1460
    ///
1461
    /// // At this point, we can abort or continue with lost messages
1462
    ///
1463
    /// assert_eq!(20, rx.recv().await.unwrap());
1464
    /// assert_eq!(30, rx.recv().await.unwrap());
1465
    /// # }
1466
    /// ```
1467
0
    pub async fn recv(&mut self) -> Result<T, RecvError> {
1468
0
        cooperative(Recv::new(self)).await
1469
0
    }
1470
1471
    /// Attempts to return a pending value on this receiver without awaiting.
1472
    ///
1473
    /// This is useful for a flavor of "optimistic check" before deciding to
1474
    /// await on a receiver.
1475
    ///
1476
    /// Compared with [`recv`], this function has three failure cases instead of two
1477
    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1478
    ///
1479
    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1480
    /// dropped, indicating that no further values can be sent on the channel.
1481
    ///
1482
    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1483
    /// sent values will overwrite old values. At this point, a call to [`recv`]
1484
    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1485
    /// internal cursor is updated to point to the oldest value still held by
1486
    /// the channel. A subsequent call to [`try_recv`] will return this value
1487
    /// **unless** it has been since overwritten. If there are no values to
1488
    /// receive, `Err(TryRecvError::Empty)` is returned.
1489
    ///
1490
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1491
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1492
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1493
    ///
1494
    /// # Examples
1495
    ///
1496
    /// ```
1497
    /// use tokio::sync::broadcast;
1498
    ///
1499
    /// # #[tokio::main(flavor = "current_thread")]
1500
    /// # async fn main() {
1501
    /// let (tx, mut rx) = broadcast::channel(16);
1502
    ///
1503
    /// assert!(rx.try_recv().is_err());
1504
    ///
1505
    /// tx.send(10).unwrap();
1506
    ///
1507
    /// let value = rx.try_recv().unwrap();
1508
    /// assert_eq!(10, value);
1509
    /// # }
1510
    /// ```
1511
0
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1512
0
        let guard = self.recv_ref(None)?;
1513
0
        guard.clone_value().ok_or(TryRecvError::Closed)
1514
0
    }
1515
1516
    /// Blocking receive to call outside of asynchronous contexts.
1517
    ///
1518
    /// # Panics
1519
    ///
1520
    /// This function panics if called within an asynchronous execution
1521
    /// context.
1522
    ///
1523
    /// # Examples
1524
    /// ```
1525
    /// # #[cfg(not(target_family = "wasm"))]
1526
    /// # {
1527
    /// use std::thread;
1528
    /// use tokio::sync::broadcast;
1529
    ///
1530
    /// #[tokio::main]
1531
    /// async fn main() {
1532
    ///     let (tx, mut rx) = broadcast::channel(16);
1533
    ///
1534
    ///     let sync_code = thread::spawn(move || {
1535
    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1536
    ///     });
1537
    ///
1538
    ///     let _ = tx.send(10);
1539
    ///     sync_code.join().unwrap();
1540
    /// }
1541
    /// # }
1542
    /// ```
1543
0
    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1544
0
        crate::future::block_on(self.recv())
1545
0
    }
1546
}
1547
1548
impl<T> Drop for Receiver<T> {
1549
0
    fn drop(&mut self) {
1550
0
        let mut tail = self.shared.tail.lock();
1551
1552
0
        tail.rx_cnt -= 1;
1553
0
        let until = tail.pos;
1554
0
        let remaining_rx = tail.rx_cnt;
1555
1556
0
        if remaining_rx == 0 {
1557
0
            self.shared.notify_last_rx_drop.notify_waiters();
1558
0
            tail.closed = true;
1559
0
        }
1560
1561
0
        drop(tail);
1562
1563
0
        while self.next < until {
1564
0
            match self.recv_ref(None) {
1565
0
                Ok(_) => {}
1566
                // The channel is closed
1567
0
                Err(TryRecvError::Closed) => break,
1568
                // Ignore lagging, we will catch up
1569
0
                Err(TryRecvError::Lagged(..)) => {}
1570
                // Can't be empty
1571
0
                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1572
            }
1573
        }
1574
0
    }
1575
}
1576
1577
impl<'a, T> Recv<'a, T> {
1578
0
    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1579
0
        Recv {
1580
0
            receiver,
1581
0
            waiter: WaiterCell(UnsafeCell::new(Waiter {
1582
0
                queued: AtomicBool::new(false),
1583
0
                waker: None,
1584
0
                pointers: linked_list::Pointers::new(),
1585
0
                _p: PhantomPinned,
1586
0
            })),
1587
0
        }
1588
0
    }
1589
1590
    /// A custom `project` implementation is used in place of `pin-project-lite`
1591
    /// as a custom drop implementation is needed.
1592
0
    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1593
        unsafe {
1594
            // Safety: Receiver is Unpin
1595
0
            is_unpin::<&mut Receiver<T>>();
1596
1597
0
            let me = self.get_unchecked_mut();
1598
0
            (me.receiver, &me.waiter.0)
1599
        }
1600
0
    }
1601
}
1602
1603
impl<'a, T> Future for Recv<'a, T>
1604
where
1605
    T: Clone,
1606
{
1607
    type Output = Result<T, RecvError>;
1608
1609
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1610
0
        ready!(crate::trace::trace_leaf(cx));
1611
1612
0
        let (receiver, waiter) = self.project();
1613
1614
0
        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1615
0
            Ok(value) => value,
1616
0
            Err(TryRecvError::Empty) => return Poll::Pending,
1617
0
            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1618
0
            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1619
        };
1620
1621
0
        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1622
0
    }
1623
}
1624
1625
impl<'a, T> Drop for Recv<'a, T> {
1626
0
    fn drop(&mut self) {
1627
        // Safety: `waiter.queued` is atomic.
1628
        // Acquire ordering is required to synchronize with
1629
        // `Shared::notify_rx` before we drop the object.
1630
0
        let queued = self
1631
0
            .waiter
1632
0
            .0
1633
0
            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1634
1635
        // If the waiter is queued, we need to unlink it from the waiters list.
1636
        // If not, no further synchronization is required, since the waiter
1637
        // is not in the list and, as such, is not shared with any other threads.
1638
0
        if queued {
1639
            // Acquire the tail lock. This is required for safety before accessing
1640
            // the waiter node.
1641
0
            let mut tail = self.receiver.shared.tail.lock();
1642
1643
            // Safety: tail lock is held.
1644
            // `Relaxed` order suffices because we hold the tail lock.
1645
0
            let queued = self
1646
0
                .waiter
1647
0
                .0
1648
0
                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1649
1650
0
            if queued {
1651
                // Remove the node
1652
                //
1653
                // safety: tail lock is held and the wait node is verified to be in
1654
                // the list.
1655
                unsafe {
1656
0
                    self.waiter.0.with_mut(|ptr| {
1657
0
                        tail.waiters.remove((&mut *ptr).into());
1658
0
                    });
1659
                }
1660
0
            }
1661
0
        }
1662
0
    }
1663
}
1664
1665
/// # Safety
1666
///
1667
/// `Waiter` is forced to be !Unpin.
1668
unsafe impl linked_list::Link for Waiter {
1669
    type Handle = NonNull<Waiter>;
1670
    type Target = Waiter;
1671
1672
0
    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1673
0
        *handle
1674
0
    }
1675
1676
0
    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1677
0
        ptr
1678
0
    }
1679
1680
0
    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1681
0
        Waiter::addr_of_pointers(target)
1682
0
    }
1683
}
1684
1685
impl<T> fmt::Debug for Sender<T> {
1686
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1687
0
        write!(fmt, "broadcast::Sender")
1688
0
    }
1689
}
1690
1691
impl<T> fmt::Debug for WeakSender<T> {
1692
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1693
0
        write!(fmt, "broadcast::WeakSender")
1694
0
    }
1695
}
1696
1697
impl<T> fmt::Debug for Receiver<T> {
1698
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1699
0
        write!(fmt, "broadcast::Receiver")
1700
0
    }
1701
}
1702
1703
impl<'a, T> RecvGuard<'a, T> {
1704
0
    fn clone_value(&self) -> Option<T>
1705
0
    where
1706
0
        T: Clone,
1707
    {
1708
0
        self.slot.val.clone()
1709
0
    }
1710
}
1711
1712
impl<'a, T> Drop for RecvGuard<'a, T> {
1713
0
    fn drop(&mut self) {
1714
        // Decrement the remaining counter
1715
0
        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1716
0
            self.slot.val = None;
1717
0
        }
1718
0
    }
1719
}
1720
1721
0
fn is_unpin<T: Unpin>() {}
1722
1723
#[cfg(not(loom))]
1724
#[cfg(test)]
1725
mod tests {
1726
    use super::*;
1727
1728
    #[test]
1729
    fn receiver_count_on_sender_constructor() {
1730
        let sender = Sender::<i32>::new(16);
1731
        assert_eq!(sender.receiver_count(), 0);
1732
1733
        let rx_1 = sender.subscribe();
1734
        assert_eq!(sender.receiver_count(), 1);
1735
1736
        let rx_2 = rx_1.resubscribe();
1737
        assert_eq!(sender.receiver_count(), 2);
1738
1739
        let rx_3 = sender.subscribe();
1740
        assert_eq!(sender.receiver_count(), 3);
1741
1742
        drop(rx_3);
1743
        drop(rx_1);
1744
        assert_eq!(sender.receiver_count(), 1);
1745
1746
        drop(rx_2);
1747
        assert_eq!(sender.receiver_count(), 0);
1748
    }
1749
1750
    #[cfg(not(loom))]
1751
    #[test]
1752
    fn receiver_count_on_channel_constructor() {
1753
        let (sender, rx) = channel::<i32>(16);
1754
        assert_eq!(sender.receiver_count(), 1);
1755
1756
        let _rx_2 = rx.resubscribe();
1757
        assert_eq!(sender.receiver_count(), 2);
1758
    }
1759
}