Coverage Report

Created: 2025-08-26 07:09

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.47.1/src/sync/broadcast.rs
Line
Count
Source (jump to first uncovered line)
1
//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2
//! all consumers.
3
//!
4
//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5
//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6
//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7
//! long as `T` is `Send`.
8
//!
9
//! When a value is sent, **all** [`Receiver`] handles are notified and will
10
//! receive the value. The value is stored once inside the channel and cloned on
11
//! demand for each receiver. Once all receivers have received a clone of the
12
//! value, the value is released from the channel.
13
//!
14
//! A channel is created by calling [`channel`], specifying the maximum number
15
//! of messages the channel can retain at any given time.
16
//!
17
//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18
//! returned [`Receiver`] will receive values sent **after** the call to
19
//! `subscribe`.
20
//!
21
//! This channel is also suitable for the single-producer multi-consumer
22
//! use-case, where a single sender broadcasts values to many receivers.
23
//!
24
//! ## Lagging
25
//!
26
//! As sent messages must be retained until **all** [`Receiver`] handles receive
27
//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28
//! In this case, all but one receiver are able to receive values at the rate
29
//! they are sent. Because one receiver is stalled, the channel starts to fill
30
//! up.
31
//!
32
//! This broadcast channel implementation handles this case by setting a hard
33
//! upper bound on the number of values the channel may retain at any given
34
//! time. This upper bound is passed to the [`channel`] function as an argument.
35
//!
36
//! If a value is sent when the channel is at capacity, the oldest value
37
//! currently held by the channel is released. This frees up space for the new
38
//! value. Any receiver that has not yet seen the released value will return
39
//! [`RecvError::Lagged`] the next time [`recv`] is called.
40
//!
41
//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42
//! updated to the oldest value contained by the channel. The next call to
43
//! [`recv`] will return this value.
44
//!
45
//! This behavior enables a receiver to detect when it has lagged so far behind
46
//! that data has been dropped. The caller may decide how to respond to this:
47
//! either by aborting its task or by tolerating lost messages and resuming
48
//! consumption of the channel.
49
//!
50
//! ## Closing
51
//!
52
//! When **all** [`Sender`] handles have been dropped, no new values may be
53
//! sent. At this point, the channel is "closed". Once a receiver has received
54
//! all values retained by the channel, the next call to [`recv`] will return
55
//! with [`RecvError::Closed`].
56
//!
57
//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58
//! will be marked as read. If this receiver was the only one not to have read
59
//! that message, the message will be dropped at this point.
60
//!
61
//! [`Sender`]: crate::sync::broadcast::Sender
62
//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63
//! [`Receiver`]: crate::sync::broadcast::Receiver
64
//! [`channel`]: crate::sync::broadcast::channel
65
//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66
//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67
//! [`recv`]: crate::sync::broadcast::Receiver::recv
68
//!
69
//! # Examples
70
//!
71
//! Basic usage
72
//!
73
//! ```
74
//! use tokio::sync::broadcast;
75
//!
76
//! #[tokio::main]
77
//! async fn main() {
78
//!     let (tx, mut rx1) = broadcast::channel(16);
79
//!     let mut rx2 = tx.subscribe();
80
//!
81
//!     tokio::spawn(async move {
82
//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83
//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84
//!     });
85
//!
86
//!     tokio::spawn(async move {
87
//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88
//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89
//!     });
90
//!
91
//!     tx.send(10).unwrap();
92
//!     tx.send(20).unwrap();
93
//! }
94
//! ```
95
//!
96
//! Handling lag
97
//!
98
//! ```
99
//! use tokio::sync::broadcast;
100
//!
101
//! #[tokio::main]
102
//! async fn main() {
103
//!     let (tx, mut rx) = broadcast::channel(2);
104
//!
105
//!     tx.send(10).unwrap();
106
//!     tx.send(20).unwrap();
107
//!     tx.send(30).unwrap();
108
//!
109
//!     // The receiver lagged behind
110
//!     assert!(rx.recv().await.is_err());
111
//!
112
//!     // At this point, we can abort or continue with lost messages
113
//!
114
//!     assert_eq!(20, rx.recv().await.unwrap());
115
//!     assert_eq!(30, rx.recv().await.unwrap());
116
//! }
117
//! ```
118
119
use crate::loom::cell::UnsafeCell;
120
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121
use crate::loom::sync::{Arc, Mutex, MutexGuard};
122
use crate::task::coop::cooperative;
123
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124
use crate::util::WakeList;
125
126
use std::fmt;
127
use std::future::Future;
128
use std::marker::PhantomPinned;
129
use std::pin::Pin;
130
use std::ptr::NonNull;
131
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132
use std::task::{ready, Context, Poll, Waker};
133
134
/// Sending-half of the [`broadcast`] channel.
135
///
136
/// May be used from many threads. Messages can be sent with
137
/// [`send`][Sender::send].
138
///
139
/// # Examples
140
///
141
/// ```
142
/// use tokio::sync::broadcast;
143
///
144
/// #[tokio::main]
145
/// async fn main() {
146
///     let (tx, mut rx1) = broadcast::channel(16);
147
///     let mut rx2 = tx.subscribe();
148
///
149
///     tokio::spawn(async move {
150
///         assert_eq!(rx1.recv().await.unwrap(), 10);
151
///         assert_eq!(rx1.recv().await.unwrap(), 20);
152
///     });
153
///
154
///     tokio::spawn(async move {
155
///         assert_eq!(rx2.recv().await.unwrap(), 10);
156
///         assert_eq!(rx2.recv().await.unwrap(), 20);
157
///     });
158
///
159
///     tx.send(10).unwrap();
160
///     tx.send(20).unwrap();
161
/// }
162
/// ```
163
///
164
/// [`broadcast`]: crate::sync::broadcast
165
pub struct Sender<T> {
166
    shared: Arc<Shared<T>>,
167
}
168
169
/// A sender that does not prevent the channel from being closed.
170
///
171
/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172
/// instances remain, the channel is closed.
173
///
174
/// In order to send messages, the `WeakSender` needs to be upgraded using
175
/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176
/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177
///
178
/// [`Sender`]: Sender
179
/// [`WeakSender::upgrade`]: WeakSender::upgrade
180
///
181
/// # Examples
182
///
183
/// ```
184
/// use tokio::sync::broadcast::channel;
185
///
186
/// #[tokio::main]
187
/// async fn main() {
188
///     let (tx, _rx) = channel::<i32>(15);
189
///     let tx_weak = tx.downgrade();
190
///
191
///     // Upgrading will succeed because `tx` still exists.
192
///     assert!(tx_weak.upgrade().is_some());
193
///
194
///     // If we drop `tx`, then it will fail.
195
///     drop(tx);
196
///     assert!(tx_weak.clone().upgrade().is_none());
197
/// }
198
/// ```
199
pub struct WeakSender<T> {
200
    shared: Arc<Shared<T>>,
201
}
202
203
/// Receiving-half of the [`broadcast`] channel.
204
///
205
/// Must not be used concurrently. Messages may be retrieved using
206
/// [`recv`][Receiver::recv].
207
///
208
/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209
/// wrapper.
210
///
211
/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212
///
213
/// # Examples
214
///
215
/// ```
216
/// use tokio::sync::broadcast;
217
///
218
/// #[tokio::main]
219
/// async fn main() {
220
///     let (tx, mut rx1) = broadcast::channel(16);
221
///     let mut rx2 = tx.subscribe();
222
///
223
///     tokio::spawn(async move {
224
///         assert_eq!(rx1.recv().await.unwrap(), 10);
225
///         assert_eq!(rx1.recv().await.unwrap(), 20);
226
///     });
227
///
228
///     tokio::spawn(async move {
229
///         assert_eq!(rx2.recv().await.unwrap(), 10);
230
///         assert_eq!(rx2.recv().await.unwrap(), 20);
231
///     });
232
///
233
///     tx.send(10).unwrap();
234
///     tx.send(20).unwrap();
235
/// }
236
/// ```
237
///
238
/// [`broadcast`]: crate::sync::broadcast
239
pub struct Receiver<T> {
240
    /// State shared with all receivers and senders.
241
    shared: Arc<Shared<T>>,
242
243
    /// Next position to read from
244
    next: u64,
245
}
246
247
pub mod error {
248
    //! Broadcast error types
249
250
    use std::fmt;
251
252
    /// Error returned by the [`send`] function on a [`Sender`].
253
    ///
254
    /// A **send** operation can only fail if there are no active receivers,
255
    /// implying that the message could never be received. The error contains the
256
    /// message being sent as a payload so it can be recovered.
257
    ///
258
    /// [`send`]: crate::sync::broadcast::Sender::send
259
    /// [`Sender`]: crate::sync::broadcast::Sender
260
    #[derive(Debug)]
261
    pub struct SendError<T>(pub T);
262
263
    impl<T> fmt::Display for SendError<T> {
264
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265
0
            write!(f, "channel closed")
266
0
        }
267
    }
268
269
    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271
    /// An error returned from the [`recv`] function on a [`Receiver`].
272
    ///
273
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
274
    /// [`Receiver`]: crate::sync::broadcast::Receiver
275
    #[derive(Debug, PartialEq, Eq, Clone)]
276
    pub enum RecvError {
277
        /// There are no more active senders implying no further messages will ever
278
        /// be sent.
279
        Closed,
280
281
        /// The receiver lagged too far behind. Attempting to receive again will
282
        /// return the oldest message still retained by the channel.
283
        ///
284
        /// Includes the number of skipped messages.
285
        Lagged(u64),
286
    }
287
288
    impl fmt::Display for RecvError {
289
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290
0
            match self {
291
0
                RecvError::Closed => write!(f, "channel closed"),
292
0
                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293
            }
294
0
        }
295
    }
296
297
    impl std::error::Error for RecvError {}
298
299
    /// An error returned from the [`try_recv`] function on a [`Receiver`].
300
    ///
301
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302
    /// [`Receiver`]: crate::sync::broadcast::Receiver
303
    #[derive(Debug, PartialEq, Eq, Clone)]
304
    pub enum TryRecvError {
305
        /// The channel is currently empty. There are still active
306
        /// [`Sender`] handles, so data may yet become available.
307
        ///
308
        /// [`Sender`]: crate::sync::broadcast::Sender
309
        Empty,
310
311
        /// There are no more active senders implying no further messages will ever
312
        /// be sent.
313
        Closed,
314
315
        /// The receiver lagged too far behind and has been forcibly disconnected.
316
        /// Attempting to receive again will return the oldest message still
317
        /// retained by the channel.
318
        ///
319
        /// Includes the number of skipped messages.
320
        Lagged(u64),
321
    }
322
323
    impl fmt::Display for TryRecvError {
324
0
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325
0
            match self {
326
0
                TryRecvError::Empty => write!(f, "channel empty"),
327
0
                TryRecvError::Closed => write!(f, "channel closed"),
328
0
                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329
            }
330
0
        }
331
    }
332
333
    impl std::error::Error for TryRecvError {}
334
}
335
336
use self::error::{RecvError, SendError, TryRecvError};
337
338
use super::Notify;
339
340
/// Data shared between senders and receivers.
341
struct Shared<T> {
342
    /// slots in the channel.
343
    buffer: Box<[Mutex<Slot<T>>]>,
344
345
    /// Mask a position -> index.
346
    mask: usize,
347
348
    /// Tail of the queue. Includes the rx wait list.
349
    tail: Mutex<Tail>,
350
351
    /// Number of outstanding Sender handles.
352
    num_tx: AtomicUsize,
353
354
    /// Number of outstanding weak Sender handles.
355
    num_weak_tx: AtomicUsize,
356
357
    /// Notify when the last subscribed [`Receiver`] drops.
358
    notify_last_rx_drop: Notify,
359
}
360
361
/// Next position to write a value.
362
struct Tail {
363
    /// Next position to write to.
364
    pos: u64,
365
366
    /// Number of active receivers.
367
    rx_cnt: usize,
368
369
    /// True if the channel is closed.
370
    closed: bool,
371
372
    /// Receivers waiting for a value.
373
    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374
}
375
376
/// Slot in the buffer.
377
struct Slot<T> {
378
    /// Remaining number of receivers that are expected to see this value.
379
    ///
380
    /// When this goes to zero, the value is released.
381
    ///
382
    /// An atomic is used as it is mutated concurrently with the slot read lock
383
    /// acquired.
384
    rem: AtomicUsize,
385
386
    /// Uniquely identifies the `send` stored in the slot.
387
    pos: u64,
388
389
    /// The value being broadcast.
390
    ///
391
    /// The value is set by `send` when the write lock is held. When a reader
392
    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393
    val: Option<T>,
394
}
395
396
/// An entry in the wait queue.
397
struct Waiter {
398
    /// True if queued.
399
    queued: AtomicBool,
400
401
    /// Task waiting on the broadcast channel.
402
    waker: Option<Waker>,
403
404
    /// Intrusive linked-list pointers.
405
    pointers: linked_list::Pointers<Waiter>,
406
407
    /// Should not be `Unpin`.
408
    _p: PhantomPinned,
409
}
410
411
impl Waiter {
412
0
    fn new() -> Self {
413
0
        Self {
414
0
            queued: AtomicBool::new(false),
415
0
            waker: None,
416
0
            pointers: linked_list::Pointers::new(),
417
0
            _p: PhantomPinned,
418
0
        }
419
0
    }
420
}
421
422
generate_addr_of_methods! {
423
    impl<> Waiter {
424
        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425
            &self.pointers
426
        }
427
    }
428
}
429
430
struct RecvGuard<'a, T> {
431
    slot: MutexGuard<'a, Slot<T>>,
432
}
433
434
/// Receive a value future.
435
struct Recv<'a, T> {
436
    /// Receiver being waited on.
437
    receiver: &'a mut Receiver<T>,
438
439
    /// Entry in the waiter `LinkedList`.
440
    waiter: WaiterCell,
441
}
442
443
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
444
// from `Recv`.
445
struct WaiterCell(UnsafeCell<Waiter>);
446
447
unsafe impl Send for WaiterCell {}
448
unsafe impl Sync for WaiterCell {}
449
450
/// Max number of receivers. Reserve space to lock.
451
const MAX_RECEIVERS: usize = usize::MAX >> 2;
452
453
/// Create a bounded, multi-producer, multi-consumer channel where each sent
454
/// value is broadcasted to all active receivers.
455
///
456
/// **Note:** The actual capacity may be greater than the provided `capacity`.
457
///
458
/// All data sent on [`Sender`] will become available on every active
459
/// [`Receiver`] in the same order as it was sent.
460
///
461
/// The `Sender` can be cloned to `send` to the same channel from multiple
462
/// points in the process or it can be used concurrently from an `Arc`. New
463
/// `Receiver` handles are created by calling [`Sender::subscribe`].
464
///
465
/// If all [`Receiver`] handles are dropped, the `send` method will return a
466
/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
467
/// method will return a [`RecvError`].
468
///
469
/// [`Sender`]: crate::sync::broadcast::Sender
470
/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
471
/// [`Receiver`]: crate::sync::broadcast::Receiver
472
/// [`recv`]: crate::sync::broadcast::Receiver::recv
473
/// [`SendError`]: crate::sync::broadcast::error::SendError
474
/// [`RecvError`]: crate::sync::broadcast::error::RecvError
475
///
476
/// # Examples
477
///
478
/// ```
479
/// use tokio::sync::broadcast;
480
///
481
/// #[tokio::main]
482
/// async fn main() {
483
///     let (tx, mut rx1) = broadcast::channel(16);
484
///     let mut rx2 = tx.subscribe();
485
///
486
///     tokio::spawn(async move {
487
///         assert_eq!(rx1.recv().await.unwrap(), 10);
488
///         assert_eq!(rx1.recv().await.unwrap(), 20);
489
///     });
490
///
491
///     tokio::spawn(async move {
492
///         assert_eq!(rx2.recv().await.unwrap(), 10);
493
///         assert_eq!(rx2.recv().await.unwrap(), 20);
494
///     });
495
///
496
///     tx.send(10).unwrap();
497
///     tx.send(20).unwrap();
498
/// }
499
/// ```
500
///
501
/// # Panics
502
///
503
/// This will panic if `capacity` is equal to `0`.
504
///
505
/// This pre-allocates space for `capacity` messages. Allocation failure may result in a panic or
506
/// [an allocation failure](std::alloc::handle_alloc_error).
507
#[track_caller]
508
0
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
509
0
    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
510
0
    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
511
0
    let rx = Receiver {
512
0
        shared: tx.shared.clone(),
513
0
        next: 0,
514
0
    };
515
0
    (tx, rx)
516
0
}
517
518
impl<T> Sender<T> {
519
    /// Creates the sending-half of the [`broadcast`] channel.
520
    ///
521
    /// See the documentation of [`broadcast::channel`] for more information on this method.
522
    ///
523
    /// [`broadcast`]: crate::sync::broadcast
524
    /// [`broadcast::channel`]: crate::sync::broadcast::channel
525
    #[track_caller]
526
0
    pub fn new(capacity: usize) -> Self {
527
0
        // SAFETY: We don't create extra receivers, so there are 0.
528
0
        unsafe { Self::new_with_receiver_count(0, capacity) }
529
0
    }
530
531
    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
532
    /// count.
533
    ///
534
    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
535
    /// calling this function.
536
    ///
537
    /// # Safety:
538
    ///
539
    /// The caller must ensure that the amount of receivers for this Sender is correct before
540
    /// the channel functionalities are used, the count is zero by default, as this function
541
    /// does not create any receivers by itself.
542
    #[track_caller]
543
0
    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
544
0
        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
545
0
        assert!(
546
0
            capacity <= usize::MAX >> 1,
547
0
            "broadcast channel capacity exceeded `usize::MAX / 2`"
548
        );
549
550
        // Round to a power of two
551
0
        capacity = capacity.next_power_of_two();
552
0
553
0
        let mut buffer = Vec::with_capacity(capacity);
554
555
0
        for i in 0..capacity {
556
0
            buffer.push(Mutex::new(Slot {
557
0
                rem: AtomicUsize::new(0),
558
0
                pos: (i as u64).wrapping_sub(capacity as u64),
559
0
                val: None,
560
0
            }));
561
0
        }
562
563
0
        let shared = Arc::new(Shared {
564
0
            buffer: buffer.into_boxed_slice(),
565
0
            mask: capacity - 1,
566
0
            tail: Mutex::new(Tail {
567
0
                pos: 0,
568
0
                rx_cnt: receiver_count,
569
0
                closed: false,
570
0
                waiters: LinkedList::new(),
571
0
            }),
572
0
            num_tx: AtomicUsize::new(1),
573
0
            num_weak_tx: AtomicUsize::new(0),
574
0
            notify_last_rx_drop: Notify::new(),
575
0
        });
576
0
577
0
        Sender { shared }
578
0
    }
579
580
    /// Attempts to send a value to all active [`Receiver`] handles, returning
581
    /// it back if it could not be sent.
582
    ///
583
    /// A successful send occurs when there is at least one active [`Receiver`]
584
    /// handle. An unsuccessful send would be one where all associated
585
    /// [`Receiver`] handles have already been dropped.
586
    ///
587
    /// # Return
588
    ///
589
    /// On success, the number of subscribed [`Receiver`] handles is returned.
590
    /// This does not mean that this number of receivers will see the message as
591
    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
592
    /// the message.
593
    ///
594
    /// # Note
595
    ///
596
    /// A return value of `Ok` **does not** mean that the sent value will be
597
    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
598
    /// handles may be dropped before receiving the sent message.
599
    ///
600
    /// A return value of `Err` **does not** mean that future calls to `send`
601
    /// will fail. New [`Receiver`] handles may be created by calling
602
    /// [`subscribe`].
603
    ///
604
    /// [`Receiver`]: crate::sync::broadcast::Receiver
605
    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
606
    ///
607
    /// # Examples
608
    ///
609
    /// ```
610
    /// use tokio::sync::broadcast;
611
    ///
612
    /// #[tokio::main]
613
    /// async fn main() {
614
    ///     let (tx, mut rx1) = broadcast::channel(16);
615
    ///     let mut rx2 = tx.subscribe();
616
    ///
617
    ///     tokio::spawn(async move {
618
    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
619
    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
620
    ///     });
621
    ///
622
    ///     tokio::spawn(async move {
623
    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
624
    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
625
    ///     });
626
    ///
627
    ///     tx.send(10).unwrap();
628
    ///     tx.send(20).unwrap();
629
    /// }
630
    /// ```
631
0
    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
632
0
        let mut tail = self.shared.tail.lock();
633
0
634
0
        if tail.rx_cnt == 0 {
635
0
            return Err(SendError(value));
636
0
        }
637
0
638
0
        // Position to write into
639
0
        let pos = tail.pos;
640
0
        let rem = tail.rx_cnt;
641
0
        let idx = (pos & self.shared.mask as u64) as usize;
642
0
643
0
        // Update the tail position
644
0
        tail.pos = tail.pos.wrapping_add(1);
645
0
646
0
        // Get the slot
647
0
        let mut slot = self.shared.buffer[idx].lock();
648
0
649
0
        // Track the position
650
0
        slot.pos = pos;
651
0
652
0
        // Set remaining receivers
653
0
        slot.rem.with_mut(|v| *v = rem);
654
0
655
0
        // Write the value
656
0
        slot.val = Some(value);
657
0
658
0
        // Release the slot lock before notifying the receivers.
659
0
        drop(slot);
660
0
661
0
        // Notify and release the mutex. This must happen after the slot lock is
662
0
        // released, otherwise the writer lock bit could be cleared while another
663
0
        // thread is in the critical section.
664
0
        self.shared.notify_rx(tail);
665
0
666
0
        Ok(rem)
667
0
    }
668
669
    /// Creates a new [`Receiver`] handle that will receive values sent **after**
670
    /// this call to `subscribe`.
671
    ///
672
    /// # Examples
673
    ///
674
    /// ```
675
    /// use tokio::sync::broadcast;
676
    ///
677
    /// #[tokio::main]
678
    /// async fn main() {
679
    ///     let (tx, _rx) = broadcast::channel(16);
680
    ///
681
    ///     // Will not be seen
682
    ///     tx.send(10).unwrap();
683
    ///
684
    ///     let mut rx = tx.subscribe();
685
    ///
686
    ///     tx.send(20).unwrap();
687
    ///
688
    ///     let value = rx.recv().await.unwrap();
689
    ///     assert_eq!(20, value);
690
    /// }
691
    /// ```
692
0
    pub fn subscribe(&self) -> Receiver<T> {
693
0
        let shared = self.shared.clone();
694
0
        new_receiver(shared)
695
0
    }
696
697
    /// Converts the `Sender` to a [`WeakSender`] that does not count
698
    /// towards RAII semantics, i.e. if all `Sender` instances of the
699
    /// channel were dropped and only `WeakSender` instances remain,
700
    /// the channel is closed.
701
    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
702
0
    pub fn downgrade(&self) -> WeakSender<T> {
703
0
        self.shared.num_weak_tx.fetch_add(1, Relaxed);
704
0
        WeakSender {
705
0
            shared: self.shared.clone(),
706
0
        }
707
0
    }
708
709
    /// Returns the number of queued values.
710
    ///
711
    /// A value is queued until it has either been seen by all receivers that were alive at the time
712
    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
713
    /// queue's capacity.
714
    ///
715
    /// # Note
716
    ///
717
    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
718
    /// have been evicted from the queue before being seen by all receivers.
719
    ///
720
    /// # Examples
721
    ///
722
    /// ```
723
    /// use tokio::sync::broadcast;
724
    ///
725
    /// #[tokio::main]
726
    /// async fn main() {
727
    ///     let (tx, mut rx1) = broadcast::channel(16);
728
    ///     let mut rx2 = tx.subscribe();
729
    ///
730
    ///     tx.send(10).unwrap();
731
    ///     tx.send(20).unwrap();
732
    ///     tx.send(30).unwrap();
733
    ///
734
    ///     assert_eq!(tx.len(), 3);
735
    ///
736
    ///     rx1.recv().await.unwrap();
737
    ///
738
    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
739
    ///     assert_eq!(tx.len(), 3);
740
    ///
741
    ///     rx2.recv().await.unwrap();
742
    ///
743
    ///     assert_eq!(tx.len(), 2);
744
    /// }
745
    /// ```
746
0
    pub fn len(&self) -> usize {
747
0
        let tail = self.shared.tail.lock();
748
0
749
0
        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
750
0
        let mut low = 0;
751
0
        let mut high = self.shared.buffer.len();
752
0
        while low < high {
753
0
            let mid = low + (high - low) / 2;
754
0
            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
755
0
            if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
756
0
                low = mid + 1;
757
0
            } else {
758
0
                high = mid;
759
0
            }
760
        }
761
762
0
        self.shared.buffer.len() - low
763
0
    }
764
765
    /// Returns true if there are no queued values.
766
    ///
767
    /// # Examples
768
    ///
769
    /// ```
770
    /// use tokio::sync::broadcast;
771
    ///
772
    /// #[tokio::main]
773
    /// async fn main() {
774
    ///     let (tx, mut rx1) = broadcast::channel(16);
775
    ///     let mut rx2 = tx.subscribe();
776
    ///
777
    ///     assert!(tx.is_empty());
778
    ///
779
    ///     tx.send(10).unwrap();
780
    ///
781
    ///     assert!(!tx.is_empty());
782
    ///
783
    ///     rx1.recv().await.unwrap();
784
    ///
785
    ///     // The queue is still not empty since rx2 hasn't seen the value.
786
    ///     assert!(!tx.is_empty());
787
    ///
788
    ///     rx2.recv().await.unwrap();
789
    ///
790
    ///     assert!(tx.is_empty());
791
    /// }
792
    /// ```
793
0
    pub fn is_empty(&self) -> bool {
794
0
        let tail = self.shared.tail.lock();
795
0
796
0
        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
797
0
        self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
798
0
    }
799
800
    /// Returns the number of active receivers.
801
    ///
802
    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
803
    /// [`subscribe`]. These are the handles that will receive values sent on
804
    /// this [`Sender`].
805
    ///
806
    /// # Note
807
    ///
808
    /// It is not guaranteed that a sent message will reach this number of
809
    /// receivers. Active receivers may never call [`recv`] again before
810
    /// dropping.
811
    ///
812
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
813
    /// [`Receiver`]: crate::sync::broadcast::Receiver
814
    /// [`Sender`]: crate::sync::broadcast::Sender
815
    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
816
    /// [`channel`]: crate::sync::broadcast::channel
817
    ///
818
    /// # Examples
819
    ///
820
    /// ```
821
    /// use tokio::sync::broadcast;
822
    ///
823
    /// #[tokio::main]
824
    /// async fn main() {
825
    ///     let (tx, _rx1) = broadcast::channel(16);
826
    ///
827
    ///     assert_eq!(1, tx.receiver_count());
828
    ///
829
    ///     let mut _rx2 = tx.subscribe();
830
    ///
831
    ///     assert_eq!(2, tx.receiver_count());
832
    ///
833
    ///     tx.send(10).unwrap();
834
    /// }
835
    /// ```
836
0
    pub fn receiver_count(&self) -> usize {
837
0
        let tail = self.shared.tail.lock();
838
0
        tail.rx_cnt
839
0
    }
840
841
    /// Returns `true` if senders belong to the same channel.
842
    ///
843
    /// # Examples
844
    ///
845
    /// ```
846
    /// use tokio::sync::broadcast;
847
    ///
848
    /// #[tokio::main]
849
    /// async fn main() {
850
    ///     let (tx, _rx) = broadcast::channel::<()>(16);
851
    ///     let tx2 = tx.clone();
852
    ///
853
    ///     assert!(tx.same_channel(&tx2));
854
    ///
855
    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
856
    ///
857
    ///     assert!(!tx3.same_channel(&tx2));
858
    /// }
859
    /// ```
860
0
    pub fn same_channel(&self, other: &Self) -> bool {
861
0
        Arc::ptr_eq(&self.shared, &other.shared)
862
0
    }
863
864
    /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
865
    /// zero.
866
    ///
867
    /// # Examples
868
    ///
869
    /// ```
870
    /// use futures::FutureExt;
871
    /// use tokio::sync::broadcast;
872
    ///
873
    /// #[tokio::main]
874
    /// async fn main() {
875
    ///     let (tx, mut rx1) = broadcast::channel::<u32>(16);
876
    ///     let mut rx2 = tx.subscribe();
877
    ///
878
    ///     let _ = tx.send(10);
879
    ///
880
    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
881
    ///     drop(rx1);
882
    ///     assert!(tx.closed().now_or_never().is_none());
883
    ///
884
    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
885
    ///     drop(rx2);
886
    ///     assert!(tx.closed().now_or_never().is_some());
887
    /// }
888
    /// ```
889
0
    pub async fn closed(&self) {
890
        loop {
891
0
            let notified = self.shared.notify_last_rx_drop.notified();
892
0
893
0
            {
894
0
                // Ensure the lock drops if the channel isn't closed
895
0
                let tail = self.shared.tail.lock();
896
0
                if tail.closed {
897
0
                    return;
898
0
                }
899
0
            }
900
0
901
0
            notified.await;
902
        }
903
0
    }
904
905
0
    fn close_channel(&self) {
906
0
        let mut tail = self.shared.tail.lock();
907
0
        tail.closed = true;
908
0
909
0
        self.shared.notify_rx(tail);
910
0
    }
911
912
    /// Returns the number of [`Sender`] handles.
913
0
    pub fn strong_count(&self) -> usize {
914
0
        self.shared.num_tx.load(Acquire)
915
0
    }
916
917
    /// Returns the number of [`WeakSender`] handles.
918
0
    pub fn weak_count(&self) -> usize {
919
0
        self.shared.num_weak_tx.load(Acquire)
920
0
    }
921
}
922
923
/// Create a new `Receiver` which reads starting from the tail.
924
0
fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
925
0
    let mut tail = shared.tail.lock();
926
0
927
0
    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
928
929
0
    if tail.rx_cnt == 0 {
930
0
        // Potentially need to re-open the channel, if a new receiver has been added between calls
931
0
        // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
932
0
        // applies if the sender has been dropped
933
0
        tail.closed = false;
934
0
    }
935
936
0
    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
937
0
    let next = tail.pos;
938
0
939
0
    drop(tail);
940
0
941
0
    Receiver { shared, next }
942
0
}
943
944
/// List used in `Shared::notify_rx`. It wraps a guarded linked list
945
/// and gates the access to it on the `Shared.tail` mutex. It also empties
946
/// the list on drop.
947
struct WaitersList<'a, T> {
948
    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
949
    is_empty: bool,
950
    shared: &'a Shared<T>,
951
}
952
953
impl<'a, T> Drop for WaitersList<'a, T> {
954
0
    fn drop(&mut self) {
955
0
        // If the list is not empty, we unlink all waiters from it.
956
0
        // We do not wake the waiters to avoid double panics.
957
0
        if !self.is_empty {
958
0
            let _lock_guard = self.shared.tail.lock();
959
0
            while self.list.pop_back().is_some() {}
960
0
        }
961
0
    }
962
}
963
964
impl<'a, T> WaitersList<'a, T> {
965
0
    fn new(
966
0
        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
967
0
        guard: Pin<&'a Waiter>,
968
0
        shared: &'a Shared<T>,
969
0
    ) -> Self {
970
0
        let guard_ptr = NonNull::from(guard.get_ref());
971
0
        let list = unguarded_list.into_guarded(guard_ptr);
972
0
        WaitersList {
973
0
            list,
974
0
            is_empty: false,
975
0
            shared,
976
0
        }
977
0
    }
978
979
    /// Removes the last element from the guarded list. Modifying this list
980
    /// requires an exclusive access to the main list in `Notify`.
981
0
    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
982
0
        let result = self.list.pop_back();
983
0
        if result.is_none() {
984
0
            // Save information about emptiness to avoid waiting for lock
985
0
            // in the destructor.
986
0
            self.is_empty = true;
987
0
        }
988
0
        result
989
0
    }
990
}
991
992
impl<T> Shared<T> {
993
0
    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
994
0
        // It is critical for `GuardedLinkedList` safety that the guard node is
995
0
        // pinned in memory and is not dropped until the guarded list is dropped.
996
0
        let guard = Waiter::new();
997
0
        pin!(guard);
998
0
999
0
        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
1000
0
        // underneath to allow every waiter to safely remove itself from it.
1001
0
        //
1002
0
        // * This list will be still guarded by the `waiters` lock.
1003
0
        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1004
0
        // * This wrapper will empty the list on drop. It is critical for safety
1005
0
        //   that we will not leave any list entry with a pointer to the local
1006
0
        //   guard node after this function returns / panics.
1007
0
        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1008
0
1009
0
        let mut wakers = WakeList::new();
1010
        'outer: loop {
1011
0
            while wakers.can_push() {
1012
0
                match list.pop_back_locked(&mut tail) {
1013
0
                    Some(waiter) => {
1014
                        unsafe {
1015
                            // Safety: accessing `waker` is safe because
1016
                            // the tail lock is held.
1017
0
                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1018
0
                                wakers.push(waker);
1019
0
                            }
1020
1021
                            // Safety: `queued` is atomic.
1022
0
                            let queued = &(*waiter.as_ptr()).queued;
1023
0
                            // `Relaxed` suffices because the tail lock is held.
1024
0
                            assert!(queued.load(Relaxed));
1025
                            // `Release` is needed to synchronize with `Recv::drop`.
1026
                            // It is critical to set this variable **after** waker
1027
                            // is extracted, otherwise we may data race with `Recv::drop`.
1028
0
                            queued.store(false, Release);
1029
                        }
1030
                    }
1031
                    None => {
1032
0
                        break 'outer;
1033
                    }
1034
                }
1035
            }
1036
1037
            // Release the lock before waking.
1038
0
            drop(tail);
1039
0
1040
0
            // Before we acquire the lock again all sorts of things can happen:
1041
0
            // some waiters may remove themselves from the list and new waiters
1042
0
            // may be added. This is fine since at worst we will unnecessarily
1043
0
            // wake up waiters which will then queue themselves again.
1044
0
1045
0
            wakers.wake_all();
1046
0
1047
0
            // Acquire the lock again.
1048
0
            tail = self.tail.lock();
1049
        }
1050
1051
        // Release the lock before waking.
1052
0
        drop(tail);
1053
0
1054
0
        wakers.wake_all();
1055
0
    }
1056
}
1057
1058
impl<T> Clone for Sender<T> {
1059
0
    fn clone(&self) -> Sender<T> {
1060
0
        let shared = self.shared.clone();
1061
0
        shared.num_tx.fetch_add(1, Relaxed);
1062
0
1063
0
        Sender { shared }
1064
0
    }
1065
}
1066
1067
impl<T> Drop for Sender<T> {
1068
0
    fn drop(&mut self) {
1069
0
        if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1070
0
            self.close_channel();
1071
0
        }
1072
0
    }
1073
}
1074
1075
impl<T> WeakSender<T> {
1076
    /// Tries to convert a `WeakSender` into a [`Sender`].
1077
    ///
1078
    /// This will return `Some` if there are other `Sender` instances alive and
1079
    /// the channel wasn't previously dropped, otherwise `None` is returned.
1080
    #[must_use]
1081
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
1082
0
        let mut tx_count = self.shared.num_tx.load(Acquire);
1083
1084
        loop {
1085
0
            if tx_count == 0 {
1086
                // channel is closed so this WeakSender can not be upgraded
1087
0
                return None;
1088
0
            }
1089
0
1090
0
            match self
1091
0
                .shared
1092
0
                .num_tx
1093
0
                .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1094
            {
1095
                Ok(_) => {
1096
0
                    return Some(Sender {
1097
0
                        shared: self.shared.clone(),
1098
0
                    })
1099
                }
1100
0
                Err(prev_count) => tx_count = prev_count,
1101
            }
1102
        }
1103
0
    }
1104
1105
    /// Returns the number of [`Sender`] handles.
1106
0
    pub fn strong_count(&self) -> usize {
1107
0
        self.shared.num_tx.load(Acquire)
1108
0
    }
1109
1110
    /// Returns the number of [`WeakSender`] handles.
1111
0
    pub fn weak_count(&self) -> usize {
1112
0
        self.shared.num_weak_tx.load(Acquire)
1113
0
    }
1114
}
1115
1116
impl<T> Clone for WeakSender<T> {
1117
0
    fn clone(&self) -> WeakSender<T> {
1118
0
        let shared = self.shared.clone();
1119
0
        shared.num_weak_tx.fetch_add(1, Relaxed);
1120
0
1121
0
        Self { shared }
1122
0
    }
1123
}
1124
1125
impl<T> Drop for WeakSender<T> {
1126
0
    fn drop(&mut self) {
1127
0
        self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1128
0
    }
1129
}
1130
1131
impl<T> Receiver<T> {
1132
    /// Returns the number of messages that were sent into the channel and that
1133
    /// this [`Receiver`] has yet to receive.
1134
    ///
1135
    /// If the returned value from `len` is larger than the next largest power of 2
1136
    /// of the capacity of the channel any call to [`recv`] will return an
1137
    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1138
    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1139
    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1140
    /// values larger than 16.
1141
    ///
1142
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1143
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1144
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1145
    ///
1146
    /// # Examples
1147
    ///
1148
    /// ```
1149
    /// use tokio::sync::broadcast;
1150
    ///
1151
    /// #[tokio::main]
1152
    /// async fn main() {
1153
    ///     let (tx, mut rx1) = broadcast::channel(16);
1154
    ///
1155
    ///     tx.send(10).unwrap();
1156
    ///     tx.send(20).unwrap();
1157
    ///
1158
    ///     assert_eq!(rx1.len(), 2);
1159
    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1160
    ///     assert_eq!(rx1.len(), 1);
1161
    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1162
    ///     assert_eq!(rx1.len(), 0);
1163
    /// }
1164
    /// ```
1165
0
    pub fn len(&self) -> usize {
1166
0
        let next_send_pos = self.shared.tail.lock().pos;
1167
0
        (next_send_pos - self.next) as usize
1168
0
    }
1169
1170
    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1171
    /// has yet to receive.
1172
    ///
1173
    /// [`Receiver]: create::sync::broadcast::Receiver
1174
    ///
1175
    /// # Examples
1176
    ///
1177
    /// ```
1178
    /// use tokio::sync::broadcast;
1179
    ///
1180
    /// #[tokio::main]
1181
    /// async fn main() {
1182
    ///     let (tx, mut rx1) = broadcast::channel(16);
1183
    ///
1184
    ///     assert!(rx1.is_empty());
1185
    ///
1186
    ///     tx.send(10).unwrap();
1187
    ///     tx.send(20).unwrap();
1188
    ///
1189
    ///     assert!(!rx1.is_empty());
1190
    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1191
    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1192
    ///     assert!(rx1.is_empty());
1193
    /// }
1194
    /// ```
1195
0
    pub fn is_empty(&self) -> bool {
1196
0
        self.len() == 0
1197
0
    }
1198
1199
    /// Returns `true` if receivers belong to the same channel.
1200
    ///
1201
    /// # Examples
1202
    ///
1203
    /// ```
1204
    /// use tokio::sync::broadcast;
1205
    ///
1206
    /// #[tokio::main]
1207
    /// async fn main() {
1208
    ///     let (tx, rx) = broadcast::channel::<()>(16);
1209
    ///     let rx2 = tx.subscribe();
1210
    ///
1211
    ///     assert!(rx.same_channel(&rx2));
1212
    ///
1213
    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1214
    ///
1215
    ///     assert!(!rx3.same_channel(&rx2));
1216
    /// }
1217
    /// ```
1218
0
    pub fn same_channel(&self, other: &Self) -> bool {
1219
0
        Arc::ptr_eq(&self.shared, &other.shared)
1220
0
    }
1221
1222
    /// Locks the next value if there is one.
1223
0
    fn recv_ref(
1224
0
        &mut self,
1225
0
        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1226
0
    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1227
0
        let idx = (self.next & self.shared.mask as u64) as usize;
1228
0
1229
0
        // The slot holding the next value to read
1230
0
        let mut slot = self.shared.buffer[idx].lock();
1231
0
1232
0
        if slot.pos != self.next {
1233
            // Release the `slot` lock before attempting to acquire the `tail`
1234
            // lock. This is required because `send2` acquires the tail lock
1235
            // first followed by the slot lock. Acquiring the locks in reverse
1236
            // order here would result in a potential deadlock: `recv_ref`
1237
            // acquires the `slot` lock and attempts to acquire the `tail` lock
1238
            // while `send2` acquired the `tail` lock and attempts to acquire
1239
            // the slot lock.
1240
0
            drop(slot);
1241
0
1242
0
            let mut old_waker = None;
1243
0
1244
0
            let mut tail = self.shared.tail.lock();
1245
0
1246
0
            // Acquire slot lock again
1247
0
            slot = self.shared.buffer[idx].lock();
1248
0
1249
0
            // Make sure the position did not change. This could happen in the
1250
0
            // unlikely event that the buffer is wrapped between dropping the
1251
0
            // read lock and acquiring the tail lock.
1252
0
            if slot.pos != self.next {
1253
0
                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1254
0
1255
0
                if next_pos == self.next {
1256
                    // At this point the channel is empty for *this* receiver. If
1257
                    // it's been closed, then that's what we return, otherwise we
1258
                    // set a waker and return empty.
1259
0
                    if tail.closed {
1260
0
                        return Err(TryRecvError::Closed);
1261
0
                    }
1262
1263
                    // Store the waker
1264
0
                    if let Some((waiter, waker)) = waiter {
1265
                        // Safety: called while locked.
1266
0
                        unsafe {
1267
0
                            // Only queue if not already queued
1268
0
                            waiter.with_mut(|ptr| {
1269
                                // If there is no waker **or** if the currently
1270
                                // stored waker references a **different** task,
1271
                                // track the tasks' waker to be notified on
1272
                                // receipt of a new value.
1273
0
                                match (*ptr).waker {
1274
0
                                    Some(ref w) if w.will_wake(waker) => {}
1275
0
                                    _ => {
1276
0
                                        old_waker = (*ptr).waker.replace(waker.clone());
1277
0
                                    }
1278
                                }
1279
1280
                                // If the waiter is not already queued, enqueue it.
1281
                                // `Relaxed` order suffices: we have synchronized with
1282
                                // all writers through the tail lock that we hold.
1283
0
                                if !(*ptr).queued.load(Relaxed) {
1284
0
                                    // `Relaxed` order suffices: all the readers will
1285
0
                                    // synchronize with this write through the tail lock.
1286
0
                                    (*ptr).queued.store(true, Relaxed);
1287
0
                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1288
0
                                }
1289
0
                            });
1290
0
                        }
1291
0
                    }
1292
1293
                    // Drop the old waker after releasing the locks.
1294
0
                    drop(slot);
1295
0
                    drop(tail);
1296
0
                    drop(old_waker);
1297
0
1298
0
                    return Err(TryRecvError::Empty);
1299
0
                }
1300
0
1301
0
                // At this point, the receiver has lagged behind the sender by
1302
0
                // more than the channel capacity. The receiver will attempt to
1303
0
                // catch up by skipping dropped messages and setting the
1304
0
                // internal cursor to the **oldest** message stored by the
1305
0
                // channel.
1306
0
                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1307
0
1308
0
                let missed = next.wrapping_sub(self.next);
1309
0
1310
0
                drop(tail);
1311
0
1312
0
                // The receiver is slow but no values have been missed
1313
0
                if missed == 0 {
1314
0
                    self.next = self.next.wrapping_add(1);
1315
0
1316
0
                    return Ok(RecvGuard { slot });
1317
0
                }
1318
0
1319
0
                self.next = next;
1320
0
1321
0
                return Err(TryRecvError::Lagged(missed));
1322
0
            }
1323
0
        }
1324
1325
0
        self.next = self.next.wrapping_add(1);
1326
0
1327
0
        Ok(RecvGuard { slot })
1328
0
    }
1329
1330
    /// Returns the number of [`Sender`] handles.
1331
0
    pub fn sender_strong_count(&self) -> usize {
1332
0
        self.shared.num_tx.load(Acquire)
1333
0
    }
1334
1335
    /// Returns the number of [`WeakSender`] handles.
1336
0
    pub fn sender_weak_count(&self) -> usize {
1337
0
        self.shared.num_weak_tx.load(Acquire)
1338
0
    }
1339
1340
    /// Checks if a channel is closed.
1341
    ///
1342
    /// This method returns `true` if the channel has been closed. The channel is closed
1343
    /// when all [`Sender`] have been dropped.
1344
    ///
1345
    /// [`Sender`]: crate::sync::broadcast::Sender
1346
    ///
1347
    /// # Examples
1348
    /// ```
1349
    /// use tokio::sync::broadcast;
1350
    ///
1351
    /// #[tokio::main]
1352
    /// async fn main() {
1353
    ///     let (tx, rx) = broadcast::channel::<()>(10);
1354
    ///     assert!(!rx.is_closed());
1355
    ///
1356
    ///     drop(tx);
1357
    ///
1358
    ///     assert!(rx.is_closed());
1359
    /// }
1360
    /// ```
1361
0
    pub fn is_closed(&self) -> bool {
1362
0
        // Channel is closed when there are no strong senders left active
1363
0
        self.shared.num_tx.load(Acquire) == 0
1364
0
    }
1365
}
1366
1367
impl<T: Clone> Receiver<T> {
1368
    /// Re-subscribes to the channel starting from the current tail element.
1369
    ///
1370
    /// This [`Receiver`] handle will receive a clone of all values sent
1371
    /// **after** it has resubscribed. This will not include elements that are
1372
    /// in the queue of the current receiver. Consider the following example.
1373
    ///
1374
    /// # Examples
1375
    ///
1376
    /// ```
1377
    /// use tokio::sync::broadcast;
1378
    ///
1379
    /// #[tokio::main]
1380
    /// async fn main() {
1381
    ///   let (tx, mut rx) = broadcast::channel(2);
1382
    ///
1383
    ///   tx.send(1).unwrap();
1384
    ///   let mut rx2 = rx.resubscribe();
1385
    ///   tx.send(2).unwrap();
1386
    ///
1387
    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1388
    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1389
    /// }
1390
    /// ```
1391
0
    pub fn resubscribe(&self) -> Self {
1392
0
        let shared = self.shared.clone();
1393
0
        new_receiver(shared)
1394
0
    }
1395
    /// Receives the next value for this receiver.
1396
    ///
1397
    /// Each [`Receiver`] handle will receive a clone of all values sent
1398
    /// **after** it has subscribed.
1399
    ///
1400
    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1401
    /// dropped, indicating that no further values can be sent on the channel.
1402
    ///
1403
    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1404
    /// sent values will overwrite old values. At this point, a call to [`recv`]
1405
    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1406
    /// internal cursor is updated to point to the oldest value still held by
1407
    /// the channel. A subsequent call to [`recv`] will return this value
1408
    /// **unless** it has been since overwritten.
1409
    ///
1410
    /// # Cancel safety
1411
    ///
1412
    /// This method is cancel safe. If `recv` is used as the event in a
1413
    /// [`tokio::select!`](crate::select) statement and some other branch
1414
    /// completes first, it is guaranteed that no messages were received on this
1415
    /// channel.
1416
    ///
1417
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1418
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1419
    ///
1420
    /// # Examples
1421
    ///
1422
    /// ```
1423
    /// use tokio::sync::broadcast;
1424
    ///
1425
    /// #[tokio::main]
1426
    /// async fn main() {
1427
    ///     let (tx, mut rx1) = broadcast::channel(16);
1428
    ///     let mut rx2 = tx.subscribe();
1429
    ///
1430
    ///     tokio::spawn(async move {
1431
    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1432
    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1433
    ///     });
1434
    ///
1435
    ///     tokio::spawn(async move {
1436
    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1437
    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1438
    ///     });
1439
    ///
1440
    ///     tx.send(10).unwrap();
1441
    ///     tx.send(20).unwrap();
1442
    /// }
1443
    /// ```
1444
    ///
1445
    /// Handling lag
1446
    ///
1447
    /// ```
1448
    /// use tokio::sync::broadcast;
1449
    ///
1450
    /// #[tokio::main]
1451
    /// async fn main() {
1452
    ///     let (tx, mut rx) = broadcast::channel(2);
1453
    ///
1454
    ///     tx.send(10).unwrap();
1455
    ///     tx.send(20).unwrap();
1456
    ///     tx.send(30).unwrap();
1457
    ///
1458
    ///     // The receiver lagged behind
1459
    ///     assert!(rx.recv().await.is_err());
1460
    ///
1461
    ///     // At this point, we can abort or continue with lost messages
1462
    ///
1463
    ///     assert_eq!(20, rx.recv().await.unwrap());
1464
    ///     assert_eq!(30, rx.recv().await.unwrap());
1465
    /// }
1466
    /// ```
1467
0
    pub async fn recv(&mut self) -> Result<T, RecvError> {
1468
0
        cooperative(Recv::new(self)).await
1469
0
    }
1470
1471
    /// Attempts to return a pending value on this receiver without awaiting.
1472
    ///
1473
    /// This is useful for a flavor of "optimistic check" before deciding to
1474
    /// await on a receiver.
1475
    ///
1476
    /// Compared with [`recv`], this function has three failure cases instead of two
1477
    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1478
    ///
1479
    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1480
    /// dropped, indicating that no further values can be sent on the channel.
1481
    ///
1482
    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1483
    /// sent values will overwrite old values. At this point, a call to [`recv`]
1484
    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1485
    /// internal cursor is updated to point to the oldest value still held by
1486
    /// the channel. A subsequent call to [`try_recv`] will return this value
1487
    /// **unless** it has been since overwritten. If there are no values to
1488
    /// receive, `Err(TryRecvError::Empty)` is returned.
1489
    ///
1490
    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1491
    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1492
    /// [`Receiver`]: crate::sync::broadcast::Receiver
1493
    ///
1494
    /// # Examples
1495
    ///
1496
    /// ```
1497
    /// use tokio::sync::broadcast;
1498
    ///
1499
    /// #[tokio::main]
1500
    /// async fn main() {
1501
    ///     let (tx, mut rx) = broadcast::channel(16);
1502
    ///
1503
    ///     assert!(rx.try_recv().is_err());
1504
    ///
1505
    ///     tx.send(10).unwrap();
1506
    ///
1507
    ///     let value = rx.try_recv().unwrap();
1508
    ///     assert_eq!(10, value);
1509
    /// }
1510
    /// ```
1511
0
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1512
0
        let guard = self.recv_ref(None)?;
1513
0
        guard.clone_value().ok_or(TryRecvError::Closed)
1514
0
    }
1515
1516
    /// Blocking receive to call outside of asynchronous contexts.
1517
    ///
1518
    /// # Panics
1519
    ///
1520
    /// This function panics if called within an asynchronous execution
1521
    /// context.
1522
    ///
1523
    /// # Examples
1524
    /// ```
1525
    /// use std::thread;
1526
    /// use tokio::sync::broadcast;
1527
    ///
1528
    /// #[tokio::main]
1529
    /// async fn main() {
1530
    ///     let (tx, mut rx) = broadcast::channel(16);
1531
    ///
1532
    ///     let sync_code = thread::spawn(move || {
1533
    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1534
    ///     });
1535
    ///
1536
    ///     let _ = tx.send(10);
1537
    ///     sync_code.join().unwrap();
1538
    /// }
1539
    /// ```
1540
0
    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1541
0
        crate::future::block_on(self.recv())
1542
0
    }
1543
}
1544
1545
impl<T> Drop for Receiver<T> {
1546
0
    fn drop(&mut self) {
1547
0
        let mut tail = self.shared.tail.lock();
1548
0
1549
0
        tail.rx_cnt -= 1;
1550
0
        let until = tail.pos;
1551
0
        let remaining_rx = tail.rx_cnt;
1552
0
1553
0
        if remaining_rx == 0 {
1554
0
            self.shared.notify_last_rx_drop.notify_waiters();
1555
0
            tail.closed = true;
1556
0
        }
1557
1558
0
        drop(tail);
1559
1560
0
        while self.next < until {
1561
0
            match self.recv_ref(None) {
1562
0
                Ok(_) => {}
1563
                // The channel is closed
1564
0
                Err(TryRecvError::Closed) => break,
1565
                // Ignore lagging, we will catch up
1566
0
                Err(TryRecvError::Lagged(..)) => {}
1567
                // Can't be empty
1568
0
                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1569
            }
1570
        }
1571
0
    }
1572
}
1573
1574
impl<'a, T> Recv<'a, T> {
1575
0
    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1576
0
        Recv {
1577
0
            receiver,
1578
0
            waiter: WaiterCell(UnsafeCell::new(Waiter {
1579
0
                queued: AtomicBool::new(false),
1580
0
                waker: None,
1581
0
                pointers: linked_list::Pointers::new(),
1582
0
                _p: PhantomPinned,
1583
0
            })),
1584
0
        }
1585
0
    }
1586
1587
    /// A custom `project` implementation is used in place of `pin-project-lite`
1588
    /// as a custom drop implementation is needed.
1589
0
    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1590
0
        unsafe {
1591
0
            // Safety: Receiver is Unpin
1592
0
            is_unpin::<&mut Receiver<T>>();
1593
0
1594
0
            let me = self.get_unchecked_mut();
1595
0
            (me.receiver, &me.waiter.0)
1596
0
        }
1597
0
    }
1598
}
1599
1600
impl<'a, T> Future for Recv<'a, T>
1601
where
1602
    T: Clone,
1603
{
1604
    type Output = Result<T, RecvError>;
1605
1606
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1607
0
        ready!(crate::trace::trace_leaf(cx));
1608
1609
0
        let (receiver, waiter) = self.project();
1610
1611
0
        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1612
0
            Ok(value) => value,
1613
0
            Err(TryRecvError::Empty) => return Poll::Pending,
1614
0
            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1615
0
            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1616
        };
1617
1618
0
        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1619
0
    }
1620
}
1621
1622
impl<'a, T> Drop for Recv<'a, T> {
1623
0
    fn drop(&mut self) {
1624
0
        // Safety: `waiter.queued` is atomic.
1625
0
        // Acquire ordering is required to synchronize with
1626
0
        // `Shared::notify_rx` before we drop the object.
1627
0
        let queued = self
1628
0
            .waiter
1629
0
            .0
1630
0
            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1631
0
1632
0
        // If the waiter is queued, we need to unlink it from the waiters list.
1633
0
        // If not, no further synchronization is required, since the waiter
1634
0
        // is not in the list and, as such, is not shared with any other threads.
1635
0
        if queued {
1636
            // Acquire the tail lock. This is required for safety before accessing
1637
            // the waiter node.
1638
0
            let mut tail = self.receiver.shared.tail.lock();
1639
0
1640
0
            // Safety: tail lock is held.
1641
0
            // `Relaxed` order suffices because we hold the tail lock.
1642
0
            let queued = self
1643
0
                .waiter
1644
0
                .0
1645
0
                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1646
0
1647
0
            if queued {
1648
                // Remove the node
1649
                //
1650
                // safety: tail lock is held and the wait node is verified to be in
1651
                // the list.
1652
0
                unsafe {
1653
0
                    self.waiter.0.with_mut(|ptr| {
1654
0
                        tail.waiters.remove((&mut *ptr).into());
1655
0
                    });
1656
0
                }
1657
0
            }
1658
0
        }
1659
0
    }
1660
}
1661
1662
/// # Safety
1663
///
1664
/// `Waiter` is forced to be !Unpin.
1665
unsafe impl linked_list::Link for Waiter {
1666
    type Handle = NonNull<Waiter>;
1667
    type Target = Waiter;
1668
1669
0
    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1670
0
        *handle
1671
0
    }
1672
1673
0
    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1674
0
        ptr
1675
0
    }
1676
1677
0
    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1678
0
        Waiter::addr_of_pointers(target)
1679
0
    }
1680
}
1681
1682
impl<T> fmt::Debug for Sender<T> {
1683
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1684
0
        write!(fmt, "broadcast::Sender")
1685
0
    }
1686
}
1687
1688
impl<T> fmt::Debug for WeakSender<T> {
1689
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1690
0
        write!(fmt, "broadcast::WeakSender")
1691
0
    }
1692
}
1693
1694
impl<T> fmt::Debug for Receiver<T> {
1695
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1696
0
        write!(fmt, "broadcast::Receiver")
1697
0
    }
1698
}
1699
1700
impl<'a, T> RecvGuard<'a, T> {
1701
0
    fn clone_value(&self) -> Option<T>
1702
0
    where
1703
0
        T: Clone,
1704
0
    {
1705
0
        self.slot.val.clone()
1706
0
    }
1707
}
1708
1709
impl<'a, T> Drop for RecvGuard<'a, T> {
1710
0
    fn drop(&mut self) {
1711
0
        // Decrement the remaining counter
1712
0
        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1713
0
            self.slot.val = None;
1714
0
        }
1715
0
    }
1716
}
1717
1718
0
fn is_unpin<T: Unpin>() {}
1719
1720
#[cfg(not(loom))]
1721
#[cfg(test)]
1722
mod tests {
1723
    use super::*;
1724
1725
    #[test]
1726
    fn receiver_count_on_sender_constructor() {
1727
        let sender = Sender::<i32>::new(16);
1728
        assert_eq!(sender.receiver_count(), 0);
1729
1730
        let rx_1 = sender.subscribe();
1731
        assert_eq!(sender.receiver_count(), 1);
1732
1733
        let rx_2 = rx_1.resubscribe();
1734
        assert_eq!(sender.receiver_count(), 2);
1735
1736
        let rx_3 = sender.subscribe();
1737
        assert_eq!(sender.receiver_count(), 3);
1738
1739
        drop(rx_3);
1740
        drop(rx_1);
1741
        assert_eq!(sender.receiver_count(), 1);
1742
1743
        drop(rx_2);
1744
        assert_eq!(sender.receiver_count(), 0);
1745
    }
1746
1747
    #[cfg(not(loom))]
1748
    #[test]
1749
    fn receiver_count_on_channel_constructor() {
1750
        let (sender, rx) = channel::<i32>(16);
1751
        assert_eq!(sender.receiver_count(), 1);
1752
1753
        let _rx_2 = rx.resubscribe();
1754
        assert_eq!(sender.receiver_count(), 2);
1755
    }
1756
}