Coverage Report

Created: 2025-10-29 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-channel-0.3.28/src/mpsc/mod.rs
Line
Count
Source
1
//! A multi-producer, single-consumer queue for sending values across
2
//! asynchronous tasks.
3
//!
4
//! Similarly to the `std`, channel creation provides [`Receiver`] and
5
//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6
//! read values out of the channel. If there is no message to read from the
7
//! channel, the current task will be notified when a new value is sent.
8
//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9
//! the channel. If the channel is at capacity, the send will be rejected and
10
//! the task will be notified when additional capacity is available. In other
11
//! words, the channel provides backpressure.
12
//!
13
//! Unbounded channels are also available using the `unbounded` constructor.
14
//!
15
//! # Disconnection
16
//!
17
//! When all [`Sender`] handles have been dropped, it is no longer
18
//! possible to send values into the channel. This is considered the termination
19
//! event of the stream. As such, [`Receiver::poll_next`]
20
//! will return `Ok(Ready(None))`.
21
//!
22
//! If the [`Receiver`] handle is dropped, then messages can no longer
23
//! be read out of the channel. In this case, all further attempts to send will
24
//! result in an error.
25
//!
26
//! # Clean Shutdown
27
//!
28
//! If the [`Receiver`] is simply dropped, then it is possible for
29
//! there to be messages still in the channel that will not be processed. As
30
//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31
//! receiver will first call `close`, which will prevent any further messages to
32
//! be sent into the channel. Then, the receiver consumes the channel to
33
//! completion, at which point the receiver can be dropped.
34
//!
35
//! [`Sender`]: struct.Sender.html
36
//! [`Receiver`]: struct.Receiver.html
37
//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38
//! [`Receiver::poll_next`]:
39
//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41
// At the core, the channel uses an atomic FIFO queue for message passing. This
42
// queue is used as the primary coordination primitive. In order to enforce
43
// capacity limits and handle back pressure, a secondary FIFO queue is used to
44
// send parked task handles.
45
//
46
// The general idea is that the channel is created with a `buffer` size of `n`.
47
// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48
// slot to hold a message. This allows `Sender` to know for a fact that a send
49
// will succeed *before* starting to do the actual work of sending the value.
50
// Since most of this work is lock-free, once the work starts, it is impossible
51
// to safely revert.
52
//
53
// If the sender is unable to process a send operation, then the current
54
// task is parked and the handle is sent on the parked task queue.
55
//
56
// Note that the implementation guarantees that the channel capacity will never
57
// exceed the configured limit, however there is no *strict* guarantee that the
58
// receiver will wake up a parked task *immediately* when a slot becomes
59
// available. However, it will almost always unpark a task when a slot becomes
60
// available and it is *guaranteed* that a sender will be unparked when the
61
// message that caused the sender to become parked is read out of the channel.
62
//
63
// The steps for sending a message are roughly:
64
//
65
// 1) Increment the channel message count
66
// 2) If the channel is at capacity, push the task handle onto the wait queue
67
// 3) Push the message onto the message queue.
68
//
69
// The steps for receiving a message are roughly:
70
//
71
// 1) Pop a message from the message queue
72
// 2) Pop a task handle from the wait queue
73
// 3) Decrement the channel message count.
74
//
75
// It's important for the order of operations on lock-free structures to happen
76
// in reverse order between the sender and receiver. This makes the message
77
// queue the primary coordination structure and establishes the necessary
78
// happens-before semantics required for the acquire / release semantics used
79
// by the queue structure.
80
81
use futures_core::stream::{FusedStream, Stream};
82
use futures_core::task::__internal::AtomicWaker;
83
use futures_core::task::{Context, Poll, Waker};
84
use std::fmt;
85
use std::pin::Pin;
86
use std::sync::atomic::AtomicUsize;
87
use std::sync::atomic::Ordering::SeqCst;
88
use std::sync::{Arc, Mutex};
89
use std::thread;
90
91
use crate::mpsc::queue::Queue;
92
93
mod queue;
94
#[cfg(feature = "sink")]
95
mod sink_impl;
96
97
struct UnboundedSenderInner<T> {
98
    // Channel state shared between the sender and receiver.
99
    inner: Arc<UnboundedInner<T>>,
100
}
101
102
struct BoundedSenderInner<T> {
103
    // Channel state shared between the sender and receiver.
104
    inner: Arc<BoundedInner<T>>,
105
106
    // Handle to the task that is blocked on this sender. This handle is sent
107
    // to the receiver half in order to be notified when the sender becomes
108
    // unblocked.
109
    sender_task: Arc<Mutex<SenderTask>>,
110
111
    // `true` if the sender might be blocked. This is an optimization to avoid
112
    // having to lock the mutex most of the time.
113
    maybe_parked: bool,
114
}
115
116
// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117
impl<T> Unpin for UnboundedSenderInner<T> {}
118
impl<T> Unpin for BoundedSenderInner<T> {}
119
120
/// The transmission end of a bounded mpsc channel.
121
///
122
/// This value is created by the [`channel`](channel) function.
123
pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124
125
/// The transmission end of an unbounded mpsc channel.
126
///
127
/// This value is created by the [`unbounded`](unbounded) function.
128
pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129
130
trait AssertKinds: Send + Sync + Clone {}
131
impl AssertKinds for UnboundedSender<u32> {}
132
133
/// The receiving end of a bounded mpsc channel.
134
///
135
/// This value is created by the [`channel`](channel) function.
136
pub struct Receiver<T> {
137
    inner: Option<Arc<BoundedInner<T>>>,
138
}
139
140
/// The receiving end of an unbounded mpsc channel.
141
///
142
/// This value is created by the [`unbounded`](unbounded) function.
143
pub struct UnboundedReceiver<T> {
144
    inner: Option<Arc<UnboundedInner<T>>>,
145
}
146
147
// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148
impl<T> Unpin for UnboundedReceiver<T> {}
149
150
/// The error type for [`Sender`s](Sender) used as `Sink`s.
151
#[derive(Clone, Debug, PartialEq, Eq)]
152
pub struct SendError {
153
    kind: SendErrorKind,
154
}
155
156
/// The error type returned from [`try_send`](Sender::try_send).
157
#[derive(Clone, PartialEq, Eq)]
158
pub struct TrySendError<T> {
159
    err: SendError,
160
    val: T,
161
}
162
163
#[derive(Clone, Debug, PartialEq, Eq)]
164
enum SendErrorKind {
165
    Full,
166
    Disconnected,
167
}
168
169
/// The error type returned from [`try_next`](Receiver::try_next).
170
pub struct TryRecvError {
171
    _priv: (),
172
}
173
174
impl fmt::Display for SendError {
175
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176
0
        if self.is_full() {
177
0
            write!(f, "send failed because channel is full")
178
        } else {
179
0
            write!(f, "send failed because receiver is gone")
180
        }
181
0
    }
182
}
183
184
impl std::error::Error for SendError {}
185
186
impl SendError {
187
    /// Returns `true` if this error is a result of the channel being full.
188
0
    pub fn is_full(&self) -> bool {
189
0
        match self.kind {
190
0
            SendErrorKind::Full => true,
191
0
            _ => false,
192
        }
193
0
    }
194
195
    /// Returns `true` if this error is a result of the receiver being dropped.
196
0
    pub fn is_disconnected(&self) -> bool {
197
0
        match self.kind {
198
0
            SendErrorKind::Disconnected => true,
199
0
            _ => false,
200
        }
201
0
    }
202
}
203
204
impl<T> fmt::Debug for TrySendError<T> {
205
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206
0
        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
207
0
    }
208
}
209
210
impl<T> fmt::Display for TrySendError<T> {
211
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212
0
        if self.is_full() {
213
0
            write!(f, "send failed because channel is full")
214
        } else {
215
0
            write!(f, "send failed because receiver is gone")
216
        }
217
0
    }
218
}
219
220
impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221
222
impl<T> TrySendError<T> {
223
    /// Returns `true` if this error is a result of the channel being full.
224
0
    pub fn is_full(&self) -> bool {
225
0
        self.err.is_full()
226
0
    }
227
228
    /// Returns `true` if this error is a result of the receiver being dropped.
229
0
    pub fn is_disconnected(&self) -> bool {
230
0
        self.err.is_disconnected()
231
0
    }
232
233
    /// Returns the message that was attempted to be sent but failed.
234
0
    pub fn into_inner(self) -> T {
235
0
        self.val
236
0
    }
237
238
    /// Drops the message and converts into a `SendError`.
239
0
    pub fn into_send_error(self) -> SendError {
240
0
        self.err
241
0
    }
242
}
243
244
impl fmt::Debug for TryRecvError {
245
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246
0
        f.debug_tuple("TryRecvError").finish()
247
0
    }
248
}
249
250
impl fmt::Display for TryRecvError {
251
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252
0
        write!(f, "receiver channel is empty")
253
0
    }
254
}
255
256
impl std::error::Error for TryRecvError {}
257
258
struct UnboundedInner<T> {
259
    // Internal channel state. Consists of the number of messages stored in the
260
    // channel as well as a flag signalling that the channel is closed.
261
    state: AtomicUsize,
262
263
    // Atomic, FIFO queue used to send messages to the receiver
264
    message_queue: Queue<T>,
265
266
    // Number of senders in existence
267
    num_senders: AtomicUsize,
268
269
    // Handle to the receiver's task.
270
    recv_task: AtomicWaker,
271
}
272
273
struct BoundedInner<T> {
274
    // Max buffer size of the channel. If `None` then the channel is unbounded.
275
    buffer: usize,
276
277
    // Internal channel state. Consists of the number of messages stored in the
278
    // channel as well as a flag signalling that the channel is closed.
279
    state: AtomicUsize,
280
281
    // Atomic, FIFO queue used to send messages to the receiver
282
    message_queue: Queue<T>,
283
284
    // Atomic, FIFO queue used to send parked task handles to the receiver.
285
    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286
287
    // Number of senders in existence
288
    num_senders: AtomicUsize,
289
290
    // Handle to the receiver's task.
291
    recv_task: AtomicWaker,
292
}
293
294
// Struct representation of `Inner::state`.
295
#[derive(Clone, Copy)]
296
struct State {
297
    // `true` when the channel is open
298
    is_open: bool,
299
300
    // Number of messages in the channel
301
    num_messages: usize,
302
}
303
304
// The `is_open` flag is stored in the left-most bit of `Inner::state`
305
const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306
307
// When a new channel is created, it is created in the open state with no
308
// pending messages.
309
const INIT_STATE: usize = OPEN_MASK;
310
311
// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312
const MAX_CAPACITY: usize = !(OPEN_MASK);
313
314
// The maximum requested buffer size must be less than the maximum capacity of
315
// a channel. This is because each sender gets a guaranteed slot.
316
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317
318
// Sent to the consumer to wake up blocked producers
319
struct SenderTask {
320
    task: Option<Waker>,
321
    is_parked: bool,
322
}
323
324
impl SenderTask {
325
0
    fn new() -> Self {
326
0
        Self { task: None, is_parked: false }
327
0
    }
328
329
0
    fn notify(&mut self) {
330
0
        self.is_parked = false;
331
332
0
        if let Some(task) = self.task.take() {
333
0
            task.wake();
334
0
        }
335
0
    }
336
}
337
338
/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339
///
340
/// Being bounded, this channel provides backpressure to ensure that the sender
341
/// outpaces the receiver by only a limited amount. The channel's capacity is
342
/// equal to `buffer + num-senders`. In other words, each sender gets a
343
/// guaranteed slot in the channel capacity, and on top of that there are
344
/// `buffer` "first come, first serve" slots available to all senders.
345
///
346
/// The [`Receiver`](Receiver) returned implements the
347
/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
348
/// `Sink`.
349
0
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
350
    // Check that the requested buffer size does not exceed the maximum buffer
351
    // size permitted by the system.
352
0
    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
353
354
0
    let inner = Arc::new(BoundedInner {
355
0
        buffer,
356
0
        state: AtomicUsize::new(INIT_STATE),
357
0
        message_queue: Queue::new(),
358
0
        parked_queue: Queue::new(),
359
0
        num_senders: AtomicUsize::new(1),
360
0
        recv_task: AtomicWaker::new(),
361
0
    });
362
363
0
    let tx = BoundedSenderInner {
364
0
        inner: inner.clone(),
365
0
        sender_task: Arc::new(Mutex::new(SenderTask::new())),
366
0
        maybe_parked: false,
367
0
    };
368
369
0
    let rx = Receiver { inner: Some(inner) };
370
371
0
    (Sender(Some(tx)), rx)
372
0
}
373
374
/// Creates an unbounded mpsc channel for communicating between asynchronous
375
/// tasks.
376
///
377
/// A `send` on this channel will always succeed as long as the receive half has
378
/// not been closed. If the receiver falls behind, messages will be arbitrarily
379
/// buffered.
380
///
381
/// **Note** that the amount of available system memory is an implicit bound to
382
/// the channel. Using an `unbounded` channel has the ability of causing the
383
/// process to run out of memory. In this case, the process will be aborted.
384
0
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
385
0
    let inner = Arc::new(UnboundedInner {
386
0
        state: AtomicUsize::new(INIT_STATE),
387
0
        message_queue: Queue::new(),
388
0
        num_senders: AtomicUsize::new(1),
389
0
        recv_task: AtomicWaker::new(),
390
0
    });
391
392
0
    let tx = UnboundedSenderInner { inner: inner.clone() };
393
394
0
    let rx = UnboundedReceiver { inner: Some(inner) };
395
396
0
    (UnboundedSender(Some(tx)), rx)
397
0
}
Unexecuted instantiation: futures_channel::mpsc::unbounded::<dbus::message::Message>
Unexecuted instantiation: futures_channel::mpsc::unbounded::<_>
398
399
/*
400
 *
401
 * ===== impl Sender =====
402
 *
403
 */
404
405
impl<T> UnboundedSenderInner<T> {
406
0
    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
407
0
        let state = decode_state(self.inner.state.load(SeqCst));
408
0
        if state.is_open {
409
0
            Poll::Ready(Ok(()))
410
        } else {
411
0
            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
412
        }
413
0
    }
414
415
    // Push message to the queue and signal to the receiver
416
0
    fn queue_push_and_signal(&self, msg: T) {
417
        // Push the message onto the message queue
418
0
        self.inner.message_queue.push(msg);
419
420
        // Signal to the receiver that a message has been enqueued. If the
421
        // receiver is parked, this will unpark the task.
422
0
        self.inner.recv_task.wake();
423
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::queue_push_and_signal
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::queue_push_and_signal
424
425
    // Increment the number of queued messages. Returns the resulting number.
426
0
    fn inc_num_messages(&self) -> Option<usize> {
427
0
        let mut curr = self.inner.state.load(SeqCst);
428
429
        loop {
430
0
            let mut state = decode_state(curr);
431
432
            // The receiver end closed the channel.
433
0
            if !state.is_open {
434
0
                return None;
435
0
            }
436
437
            // This probably is never hit? Odds are the process will run out of
438
            // memory first. It may be worth to return something else in this
439
            // case?
440
0
            assert!(
441
0
                state.num_messages < MAX_CAPACITY,
442
                "buffer space \
443
                    exhausted; sending this messages would overflow the state"
444
            );
445
446
0
            state.num_messages += 1;
447
448
0
            let next = encode_state(&state);
449
0
            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
450
0
                Ok(_) => return Some(state.num_messages),
451
0
                Err(actual) => curr = actual,
452
            }
453
        }
454
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::inc_num_messages
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::inc_num_messages
455
456
    /// Returns whether the senders send to the same receiver.
457
0
    fn same_receiver(&self, other: &Self) -> bool {
458
0
        Arc::ptr_eq(&self.inner, &other.inner)
459
0
    }
460
461
    /// Returns whether the sender send to this receiver.
462
0
    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
463
0
        Arc::ptr_eq(&self.inner, inner)
464
0
    }
465
466
    /// Returns pointer to the Arc containing sender
467
    ///
468
    /// The returned pointer is not referenced and should be only used for hashing!
469
0
    fn ptr(&self) -> *const UnboundedInner<T> {
470
0
        &*self.inner
471
0
    }
472
473
    /// Returns whether this channel is closed without needing a context.
474
0
    fn is_closed(&self) -> bool {
475
0
        !decode_state(self.inner.state.load(SeqCst)).is_open
476
0
    }
477
478
    /// Closes this channel from the sender side, preventing any new messages.
479
0
    fn close_channel(&self) {
480
        // There's no need to park this sender, its dropping,
481
        // and we don't want to check for capacity, so skip
482
        // that stuff from `do_send`.
483
484
0
        self.inner.set_closed();
485
0
        self.inner.recv_task.wake();
486
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::close_channel
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::close_channel
487
}
488
489
impl<T> BoundedSenderInner<T> {
490
    /// Attempts to send a message on this `Sender`, returning the message
491
    /// if there was an error.
492
0
    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
493
        // If the sender is currently blocked, reject the message
494
0
        if !self.poll_unparked(None).is_ready() {
495
0
            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
496
0
        }
497
498
        // The channel has capacity to accept the message, so send it
499
0
        self.do_send_b(msg)
500
0
    }
501
502
    // Do the send without failing.
503
    // Can be called only by bounded sender.
504
0
    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
505
        // Anyone calling do_send *should* make sure there is room first,
506
        // but assert here for tests as a sanity check.
507
0
        debug_assert!(self.poll_unparked(None).is_ready());
508
509
        // First, increment the number of messages contained by the channel.
510
        // This operation will also atomically determine if the sender task
511
        // should be parked.
512
        //
513
        // `None` is returned in the case that the channel has been closed by the
514
        // receiver. This happens when `Receiver::close` is called or the
515
        // receiver is dropped.
516
0
        let park_self = match self.inc_num_messages() {
517
0
            Some(num_messages) => {
518
                // Block if the current number of pending messages has exceeded
519
                // the configured buffer size
520
0
                num_messages > self.inner.buffer
521
            }
522
            None => {
523
0
                return Err(TrySendError {
524
0
                    err: SendError { kind: SendErrorKind::Disconnected },
525
0
                    val: msg,
526
0
                })
527
            }
528
        };
529
530
        // If the channel has reached capacity, then the sender task needs to
531
        // be parked. This will send the task handle on the parked task queue.
532
        //
533
        // However, when `do_send` is called while dropping the `Sender`,
534
        // `task::current()` can't be called safely. In this case, in order to
535
        // maintain internal consistency, a blank message is pushed onto the
536
        // parked task queue.
537
0
        if park_self {
538
0
            self.park();
539
0
        }
540
541
0
        self.queue_push_and_signal(msg);
542
543
0
        Ok(())
544
0
    }
545
546
    // Push message to the queue and signal to the receiver
547
0
    fn queue_push_and_signal(&self, msg: T) {
548
        // Push the message onto the message queue
549
0
        self.inner.message_queue.push(msg);
550
551
        // Signal to the receiver that a message has been enqueued. If the
552
        // receiver is parked, this will unpark the task.
553
0
        self.inner.recv_task.wake();
554
0
    }
555
556
    // Increment the number of queued messages. Returns the resulting number.
557
0
    fn inc_num_messages(&self) -> Option<usize> {
558
0
        let mut curr = self.inner.state.load(SeqCst);
559
560
        loop {
561
0
            let mut state = decode_state(curr);
562
563
            // The receiver end closed the channel.
564
0
            if !state.is_open {
565
0
                return None;
566
0
            }
567
568
            // This probably is never hit? Odds are the process will run out of
569
            // memory first. It may be worth to return something else in this
570
            // case?
571
0
            assert!(
572
0
                state.num_messages < MAX_CAPACITY,
573
                "buffer space \
574
                    exhausted; sending this messages would overflow the state"
575
            );
576
577
0
            state.num_messages += 1;
578
579
0
            let next = encode_state(&state);
580
0
            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
581
0
                Ok(_) => return Some(state.num_messages),
582
0
                Err(actual) => curr = actual,
583
            }
584
        }
585
0
    }
586
587
0
    fn park(&mut self) {
588
0
        {
589
0
            let mut sender = self.sender_task.lock().unwrap();
590
0
            sender.task = None;
591
0
            sender.is_parked = true;
592
0
        }
593
594
        // Send handle over queue
595
0
        let t = self.sender_task.clone();
596
0
        self.inner.parked_queue.push(t);
597
598
        // Check to make sure we weren't closed after we sent our task on the
599
        // queue
600
0
        let state = decode_state(self.inner.state.load(SeqCst));
601
0
        self.maybe_parked = state.is_open;
602
0
    }
603
604
    /// Polls the channel to determine if there is guaranteed capacity to send
605
    /// at least one item without waiting.
606
    ///
607
    /// # Return value
608
    ///
609
    /// This method returns:
610
    ///
611
    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
612
    /// - `Poll::Pending` if the channel may not have
613
    ///   capacity, in which case the current task is queued to be notified once
614
    ///   capacity is available;
615
    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
616
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
617
0
        let state = decode_state(self.inner.state.load(SeqCst));
618
0
        if !state.is_open {
619
0
            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
620
0
        }
621
622
0
        self.poll_unparked(Some(cx)).map(Ok)
623
0
    }
624
625
    /// Returns whether the senders send to the same receiver.
626
0
    fn same_receiver(&self, other: &Self) -> bool {
627
0
        Arc::ptr_eq(&self.inner, &other.inner)
628
0
    }
629
630
    /// Returns whether the sender send to this receiver.
631
0
    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
632
0
        Arc::ptr_eq(&self.inner, receiver)
633
0
    }
634
635
    /// Returns pointer to the Arc containing sender
636
    ///
637
    /// The returned pointer is not referenced and should be only used for hashing!
638
0
    fn ptr(&self) -> *const BoundedInner<T> {
639
0
        &*self.inner
640
0
    }
641
642
    /// Returns whether this channel is closed without needing a context.
643
0
    fn is_closed(&self) -> bool {
644
0
        !decode_state(self.inner.state.load(SeqCst)).is_open
645
0
    }
646
647
    /// Closes this channel from the sender side, preventing any new messages.
648
0
    fn close_channel(&self) {
649
        // There's no need to park this sender, its dropping,
650
        // and we don't want to check for capacity, so skip
651
        // that stuff from `do_send`.
652
653
0
        self.inner.set_closed();
654
0
        self.inner.recv_task.wake();
655
0
    }
656
657
0
    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
658
        // First check the `maybe_parked` variable. This avoids acquiring the
659
        // lock in most cases
660
0
        if self.maybe_parked {
661
            // Get a lock on the task handle
662
0
            let mut task = self.sender_task.lock().unwrap();
663
664
0
            if !task.is_parked {
665
0
                self.maybe_parked = false;
666
0
                return Poll::Ready(());
667
0
            }
668
669
            // At this point, an unpark request is pending, so there will be an
670
            // unpark sometime in the future. We just need to make sure that
671
            // the correct task will be notified.
672
            //
673
            // Update the task in case the `Sender` has been moved to another
674
            // task
675
0
            task.task = cx.map(|cx| cx.waker().clone());
676
677
0
            Poll::Pending
678
        } else {
679
0
            Poll::Ready(())
680
        }
681
0
    }
682
}
683
684
impl<T> Sender<T> {
685
    /// Attempts to send a message on this `Sender`, returning the message
686
    /// if there was an error.
687
0
    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
688
0
        if let Some(inner) = &mut self.0 {
689
0
            inner.try_send(msg)
690
        } else {
691
0
            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
692
        }
693
0
    }
694
695
    /// Send a message on the channel.
696
    ///
697
    /// This function should only be called after
698
    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
699
    /// ready to receive a message.
700
0
    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
701
0
        self.try_send(msg).map_err(|e| e.err)
702
0
    }
703
704
    /// Polls the channel to determine if there is guaranteed capacity to send
705
    /// at least one item without waiting.
706
    ///
707
    /// # Return value
708
    ///
709
    /// This method returns:
710
    ///
711
    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
712
    /// - `Poll::Pending` if the channel may not have
713
    ///   capacity, in which case the current task is queued to be notified once
714
    ///   capacity is available;
715
    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
716
0
    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
717
0
        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
718
0
        inner.poll_ready(cx)
719
0
    }
720
721
    /// Returns whether this channel is closed without needing a context.
722
0
    pub fn is_closed(&self) -> bool {
723
0
        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
724
0
    }
725
726
    /// Closes this channel from the sender side, preventing any new messages.
727
0
    pub fn close_channel(&mut self) {
728
0
        if let Some(inner) = &mut self.0 {
729
0
            inner.close_channel();
730
0
        }
731
0
    }
732
733
    /// Disconnects this sender from the channel, closing it if there are no more senders left.
734
0
    pub fn disconnect(&mut self) {
735
0
        self.0 = None;
736
0
    }
737
738
    /// Returns whether the senders send to the same receiver.
739
0
    pub fn same_receiver(&self, other: &Self) -> bool {
740
0
        match (&self.0, &other.0) {
741
0
            (Some(inner), Some(other)) => inner.same_receiver(other),
742
0
            _ => false,
743
        }
744
0
    }
745
746
    /// Returns whether the sender send to this receiver.
747
0
    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
748
0
        match (&self.0, &receiver.inner) {
749
0
            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
750
0
            _ => false,
751
        }
752
0
    }
753
754
    /// Hashes the receiver into the provided hasher
755
0
    pub fn hash_receiver<H>(&self, hasher: &mut H)
756
0
    where
757
0
        H: std::hash::Hasher,
758
    {
759
        use std::hash::Hash;
760
761
0
        let ptr = self.0.as_ref().map(|inner| inner.ptr());
762
0
        ptr.hash(hasher);
763
0
    }
764
}
765
766
impl<T> UnboundedSender<T> {
767
    /// Check if the channel is ready to receive a message.
768
0
    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
769
0
        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
770
0
        inner.poll_ready_nb()
771
0
    }
772
773
    /// Returns whether this channel is closed without needing a context.
774
0
    pub fn is_closed(&self) -> bool {
775
0
        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
776
0
    }
777
778
    /// Closes this channel from the sender side, preventing any new messages.
779
0
    pub fn close_channel(&self) {
780
0
        if let Some(inner) = &self.0 {
781
0
            inner.close_channel();
782
0
        }
783
0
    }
784
785
    /// Disconnects this sender from the channel, closing it if there are no more senders left.
786
0
    pub fn disconnect(&mut self) {
787
0
        self.0 = None;
788
0
    }
789
790
    // Do the send without parking current task.
791
0
    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
792
0
        if let Some(inner) = &self.0 {
793
0
            if inner.inc_num_messages().is_some() {
794
0
                inner.queue_push_and_signal(msg);
795
0
                return Ok(());
796
0
            }
797
0
        }
798
799
0
        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
800
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<dbus::message::Message>>::do_send_nb
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::do_send_nb
801
802
    /// Send a message on the channel.
803
    ///
804
    /// This method should only be called after `poll_ready` has been used to
805
    /// verify that the channel is ready to receive a message.
806
0
    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
807
0
        self.do_send_nb(msg).map_err(|e| e.err)
808
0
    }
809
810
    /// Sends a message along this channel.
811
    ///
812
    /// This is an unbounded sender, so this function differs from `Sink::send`
813
    /// by ensuring the return type reflects that the channel is always ready to
814
    /// receive messages.
815
0
    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
816
0
        self.do_send_nb(msg)
817
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<dbus::message::Message>>::unbounded_send
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::unbounded_send
818
819
    /// Returns whether the senders send to the same receiver.
820
0
    pub fn same_receiver(&self, other: &Self) -> bool {
821
0
        match (&self.0, &other.0) {
822
0
            (Some(inner), Some(other)) => inner.same_receiver(other),
823
0
            _ => false,
824
        }
825
0
    }
826
827
    /// Returns whether the sender send to this receiver.
828
0
    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
829
0
        match (&self.0, &receiver.inner) {
830
0
            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
831
0
            _ => false,
832
        }
833
0
    }
834
835
    /// Hashes the receiver into the provided hasher
836
0
    pub fn hash_receiver<H>(&self, hasher: &mut H)
837
0
    where
838
0
        H: std::hash::Hasher,
839
    {
840
        use std::hash::Hash;
841
842
0
        let ptr = self.0.as_ref().map(|inner| inner.ptr());
843
0
        ptr.hash(hasher);
844
0
    }
845
}
846
847
impl<T> Clone for Sender<T> {
848
0
    fn clone(&self) -> Self {
849
0
        Self(self.0.clone())
850
0
    }
851
}
852
853
impl<T> Clone for UnboundedSender<T> {
854
0
    fn clone(&self) -> Self {
855
0
        Self(self.0.clone())
856
0
    }
857
}
858
859
impl<T> Clone for UnboundedSenderInner<T> {
860
0
    fn clone(&self) -> Self {
861
        // Since this atomic op isn't actually guarding any memory and we don't
862
        // care about any orderings besides the ordering on the single atomic
863
        // variable, a relaxed ordering is acceptable.
864
0
        let mut curr = self.inner.num_senders.load(SeqCst);
865
866
        loop {
867
            // If the maximum number of senders has been reached, then fail
868
0
            if curr == MAX_BUFFER {
869
0
                panic!("cannot clone `Sender` -- too many outstanding senders");
870
0
            }
871
872
0
            debug_assert!(curr < MAX_BUFFER);
873
874
0
            let next = curr + 1;
875
0
            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
876
                Ok(_) => {
877
                    // The ABA problem doesn't matter here. We only care that the
878
                    // number of senders never exceeds the maximum.
879
0
                    return Self { inner: self.inner.clone() };
880
                }
881
0
                Err(actual) => curr = actual,
882
            }
883
        }
884
0
    }
885
}
886
887
impl<T> Clone for BoundedSenderInner<T> {
888
0
    fn clone(&self) -> Self {
889
        // Since this atomic op isn't actually guarding any memory and we don't
890
        // care about any orderings besides the ordering on the single atomic
891
        // variable, a relaxed ordering is acceptable.
892
0
        let mut curr = self.inner.num_senders.load(SeqCst);
893
894
        loop {
895
            // If the maximum number of senders has been reached, then fail
896
0
            if curr == self.inner.max_senders() {
897
0
                panic!("cannot clone `Sender` -- too many outstanding senders");
898
0
            }
899
900
0
            debug_assert!(curr < self.inner.max_senders());
901
902
0
            let next = curr + 1;
903
0
            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
904
                Ok(_) => {
905
                    // The ABA problem doesn't matter here. We only care that the
906
                    // number of senders never exceeds the maximum.
907
0
                    return Self {
908
0
                        inner: self.inner.clone(),
909
0
                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
910
0
                        maybe_parked: false,
911
0
                    };
912
                }
913
0
                Err(actual) => curr = actual,
914
            }
915
        }
916
0
    }
917
}
918
919
impl<T> Drop for UnboundedSenderInner<T> {
920
0
    fn drop(&mut self) {
921
        // Ordering between variables don't matter here
922
0
        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
923
924
0
        if prev == 1 {
925
0
            self.close_channel();
926
0
        }
927
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_> as core::ops::drop::Drop>::drop
928
}
929
930
impl<T> Drop for BoundedSenderInner<T> {
931
0
    fn drop(&mut self) {
932
        // Ordering between variables don't matter here
933
0
        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
934
935
0
        if prev == 1 {
936
0
            self.close_channel();
937
0
        }
938
0
    }
939
}
940
941
impl<T> fmt::Debug for Sender<T> {
942
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943
0
        f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
944
0
    }
945
}
946
947
impl<T> fmt::Debug for UnboundedSender<T> {
948
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
949
0
        f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
950
0
    }
951
}
952
953
/*
954
 *
955
 * ===== impl Receiver =====
956
 *
957
 */
958
959
impl<T> Receiver<T> {
960
    /// Closes the receiving half of a channel, without dropping it.
961
    ///
962
    /// This prevents any further messages from being sent on the channel while
963
    /// still enabling the receiver to drain messages that are buffered.
964
0
    pub fn close(&mut self) {
965
0
        if let Some(inner) = &mut self.inner {
966
0
            inner.set_closed();
967
968
            // Wake up any threads waiting as they'll see that we've closed the
969
            // channel and will continue on their merry way.
970
0
            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
971
0
                task.lock().unwrap().notify();
972
0
            }
973
0
        }
974
0
    }
975
976
    /// Tries to receive the next message without notifying a context if empty.
977
    ///
978
    /// It is not recommended to call this function from inside of a future,
979
    /// only when you've otherwise arranged to be notified when the channel is
980
    /// no longer empty.
981
    ///
982
    /// This function returns:
983
    /// * `Ok(Some(t))` when message is fetched
984
    /// * `Ok(None)` when channel is closed and no messages left in the queue
985
    /// * `Err(e)` when there are no messages available, but channel is not yet closed
986
0
    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
987
0
        match self.next_message() {
988
0
            Poll::Ready(msg) => Ok(msg),
989
0
            Poll::Pending => Err(TryRecvError { _priv: () }),
990
        }
991
0
    }
992
993
0
    fn next_message(&mut self) -> Poll<Option<T>> {
994
0
        let inner = match self.inner.as_mut() {
995
0
            None => return Poll::Ready(None),
996
0
            Some(inner) => inner,
997
        };
998
        // Pop off a message
999
0
        match unsafe { inner.message_queue.pop_spin() } {
1000
0
            Some(msg) => {
1001
                // If there are any parked task handles in the parked queue,
1002
                // pop one and unpark it.
1003
0
                self.unpark_one();
1004
1005
                // Decrement number of messages
1006
0
                self.dec_num_messages();
1007
1008
0
                Poll::Ready(Some(msg))
1009
            }
1010
            None => {
1011
0
                let state = decode_state(inner.state.load(SeqCst));
1012
0
                if state.is_closed() {
1013
                    // If closed flag is set AND there are no pending messages
1014
                    // it means end of stream
1015
0
                    self.inner = None;
1016
0
                    Poll::Ready(None)
1017
                } else {
1018
                    // If queue is open, we need to return Pending
1019
                    // to be woken up when new messages arrive.
1020
                    // If queue is closed but num_messages is non-zero,
1021
                    // it means that senders updated the state,
1022
                    // but didn't put message to queue yet,
1023
                    // so we need to park until sender unparks the task
1024
                    // after queueing the message.
1025
0
                    Poll::Pending
1026
                }
1027
            }
1028
        }
1029
0
    }
1030
1031
    // Unpark a single task handle if there is one pending in the parked queue
1032
0
    fn unpark_one(&mut self) {
1033
0
        if let Some(inner) = &mut self.inner {
1034
0
            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1035
0
                task.lock().unwrap().notify();
1036
0
            }
1037
0
        }
1038
0
    }
1039
1040
0
    fn dec_num_messages(&self) {
1041
0
        if let Some(inner) = &self.inner {
1042
0
            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1043
0
            // unless there's underflow, and we know there's no underflow
1044
0
            // because number of messages at this point is always > 0.
1045
0
            inner.state.fetch_sub(1, SeqCst);
1046
0
        }
1047
0
    }
1048
}
1049
1050
// The receiver does not ever take a Pin to the inner T
1051
impl<T> Unpin for Receiver<T> {}
1052
1053
impl<T> FusedStream for Receiver<T> {
1054
0
    fn is_terminated(&self) -> bool {
1055
0
        self.inner.is_none()
1056
0
    }
1057
}
1058
1059
impl<T> Stream for Receiver<T> {
1060
    type Item = T;
1061
1062
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1063
        // Try to read a message off of the message queue.
1064
0
        match self.next_message() {
1065
0
            Poll::Ready(msg) => {
1066
0
                if msg.is_none() {
1067
0
                    self.inner = None;
1068
0
                }
1069
0
                Poll::Ready(msg)
1070
            }
1071
            Poll::Pending => {
1072
                // There are no messages to read, in this case, park.
1073
0
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1074
                // Check queue again after parking to prevent race condition:
1075
                // a message could be added to the queue after previous `next_message`
1076
                // before `register` call.
1077
0
                self.next_message()
1078
            }
1079
        }
1080
0
    }
1081
1082
0
    fn size_hint(&self) -> (usize, Option<usize>) {
1083
0
        if let Some(inner) = &self.inner {
1084
0
            decode_state(inner.state.load(SeqCst)).size_hint()
1085
        } else {
1086
0
            (0, Some(0))
1087
        }
1088
0
    }
1089
}
1090
1091
impl<T> Drop for Receiver<T> {
1092
0
    fn drop(&mut self) {
1093
        // Drain the channel of all pending messages
1094
0
        self.close();
1095
0
        if self.inner.is_some() {
1096
            loop {
1097
0
                match self.next_message() {
1098
0
                    Poll::Ready(Some(_)) => {}
1099
0
                    Poll::Ready(None) => break,
1100
                    Poll::Pending => {
1101
0
                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1102
1103
                        // If the channel is closed, then there is no need to park.
1104
0
                        if state.is_closed() {
1105
0
                            break;
1106
0
                        }
1107
1108
                        // TODO: Spinning isn't ideal, it might be worth
1109
                        // investigating using a condvar or some other strategy
1110
                        // here. That said, if this case is hit, then another thread
1111
                        // is about to push the value into the queue and this isn't
1112
                        // the only spinlock in the impl right now.
1113
0
                        thread::yield_now();
1114
                    }
1115
                }
1116
            }
1117
0
        }
1118
0
    }
1119
}
1120
1121
impl<T> fmt::Debug for Receiver<T> {
1122
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1123
0
        let closed = if let Some(ref inner) = self.inner {
1124
0
            decode_state(inner.state.load(SeqCst)).is_closed()
1125
        } else {
1126
0
            false
1127
        };
1128
1129
0
        f.debug_struct("Receiver").field("closed", &closed).finish()
1130
0
    }
1131
}
1132
1133
impl<T> UnboundedReceiver<T> {
1134
    /// Closes the receiving half of a channel, without dropping it.
1135
    ///
1136
    /// This prevents any further messages from being sent on the channel while
1137
    /// still enabling the receiver to drain messages that are buffered.
1138
0
    pub fn close(&mut self) {
1139
0
        if let Some(inner) = &mut self.inner {
1140
0
            inner.set_closed();
1141
0
        }
1142
0
    }
1143
1144
    /// Tries to receive the next message without notifying a context if empty.
1145
    ///
1146
    /// It is not recommended to call this function from inside of a future,
1147
    /// only when you've otherwise arranged to be notified when the channel is
1148
    /// no longer empty.
1149
    ///
1150
    /// This function returns:
1151
    /// * `Ok(Some(t))` when message is fetched
1152
    /// * `Ok(None)` when channel is closed and no messages left in the queue
1153
    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1154
0
    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1155
0
        match self.next_message() {
1156
0
            Poll::Ready(msg) => Ok(msg),
1157
0
            Poll::Pending => Err(TryRecvError { _priv: () }),
1158
        }
1159
0
    }
1160
1161
0
    fn next_message(&mut self) -> Poll<Option<T>> {
1162
0
        let inner = match self.inner.as_mut() {
1163
0
            None => return Poll::Ready(None),
1164
0
            Some(inner) => inner,
1165
        };
1166
        // Pop off a message
1167
0
        match unsafe { inner.message_queue.pop_spin() } {
1168
0
            Some(msg) => {
1169
                // Decrement number of messages
1170
0
                self.dec_num_messages();
1171
1172
0
                Poll::Ready(Some(msg))
1173
            }
1174
            None => {
1175
0
                let state = decode_state(inner.state.load(SeqCst));
1176
0
                if state.is_closed() {
1177
                    // If closed flag is set AND there are no pending messages
1178
                    // it means end of stream
1179
0
                    self.inner = None;
1180
0
                    Poll::Ready(None)
1181
                } else {
1182
                    // If queue is open, we need to return Pending
1183
                    // to be woken up when new messages arrive.
1184
                    // If queue is closed but num_messages is non-zero,
1185
                    // it means that senders updated the state,
1186
                    // but didn't put message to queue yet,
1187
                    // so we need to park until sender unparks the task
1188
                    // after queueing the message.
1189
0
                    Poll::Pending
1190
                }
1191
            }
1192
        }
1193
0
    }
1194
1195
0
    fn dec_num_messages(&self) {
1196
0
        if let Some(inner) = &self.inner {
1197
0
            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1198
0
            // unless there's underflow, and we know there's no underflow
1199
0
            // because number of messages at this point is always > 0.
1200
0
            inner.state.fetch_sub(1, SeqCst);
1201
0
        }
1202
0
    }
1203
}
1204
1205
impl<T> FusedStream for UnboundedReceiver<T> {
1206
0
    fn is_terminated(&self) -> bool {
1207
0
        self.inner.is_none()
1208
0
    }
1209
}
1210
1211
impl<T> Stream for UnboundedReceiver<T> {
1212
    type Item = T;
1213
1214
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1215
        // Try to read a message off of the message queue.
1216
0
        match self.next_message() {
1217
0
            Poll::Ready(msg) => {
1218
0
                if msg.is_none() {
1219
0
                    self.inner = None;
1220
0
                }
1221
0
                Poll::Ready(msg)
1222
            }
1223
            Poll::Pending => {
1224
                // There are no messages to read, in this case, park.
1225
0
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1226
                // Check queue again after parking to prevent race condition:
1227
                // a message could be added to the queue after previous `next_message`
1228
                // before `register` call.
1229
0
                self.next_message()
1230
            }
1231
        }
1232
0
    }
1233
1234
0
    fn size_hint(&self) -> (usize, Option<usize>) {
1235
0
        if let Some(inner) = &self.inner {
1236
0
            decode_state(inner.state.load(SeqCst)).size_hint()
1237
        } else {
1238
0
            (0, Some(0))
1239
        }
1240
0
    }
1241
}
1242
1243
impl<T> Drop for UnboundedReceiver<T> {
1244
0
    fn drop(&mut self) {
1245
        // Drain the channel of all pending messages
1246
0
        self.close();
1247
0
        if self.inner.is_some() {
1248
            loop {
1249
0
                match self.next_message() {
1250
0
                    Poll::Ready(Some(_)) => {}
1251
0
                    Poll::Ready(None) => break,
1252
                    Poll::Pending => {
1253
0
                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1254
1255
                        // If the channel is closed, then there is no need to park.
1256
0
                        if state.is_closed() {
1257
0
                            break;
1258
0
                        }
1259
1260
                        // TODO: Spinning isn't ideal, it might be worth
1261
                        // investigating using a condvar or some other strategy
1262
                        // here. That said, if this case is hit, then another thread
1263
                        // is about to push the value into the queue and this isn't
1264
                        // the only spinlock in the impl right now.
1265
0
                        thread::yield_now();
1266
                    }
1267
                }
1268
            }
1269
0
        }
1270
0
    }
1271
}
1272
1273
impl<T> fmt::Debug for UnboundedReceiver<T> {
1274
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1275
0
        let closed = if let Some(ref inner) = self.inner {
1276
0
            decode_state(inner.state.load(SeqCst)).is_closed()
1277
        } else {
1278
0
            false
1279
        };
1280
1281
0
        f.debug_struct("Receiver").field("closed", &closed).finish()
1282
0
    }
1283
}
1284
1285
/*
1286
 *
1287
 * ===== impl Inner =====
1288
 *
1289
 */
1290
1291
impl<T> UnboundedInner<T> {
1292
    // Clear `open` flag in the state, keep `num_messages` intact.
1293
0
    fn set_closed(&self) {
1294
0
        let curr = self.state.load(SeqCst);
1295
0
        if !decode_state(curr).is_open {
1296
0
            return;
1297
0
        }
1298
1299
0
        self.state.fetch_and(!OPEN_MASK, SeqCst);
1300
0
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedInner<dbus::message::Message>>::set_closed
Unexecuted instantiation: <futures_channel::mpsc::UnboundedInner<_>>::set_closed
1301
}
1302
1303
impl<T> BoundedInner<T> {
1304
    // The return value is such that the total number of messages that can be
1305
    // enqueued into the channel will never exceed MAX_CAPACITY
1306
0
    fn max_senders(&self) -> usize {
1307
0
        MAX_CAPACITY - self.buffer
1308
0
    }
1309
1310
    // Clear `open` flag in the state, keep `num_messages` intact.
1311
0
    fn set_closed(&self) {
1312
0
        let curr = self.state.load(SeqCst);
1313
0
        if !decode_state(curr).is_open {
1314
0
            return;
1315
0
        }
1316
1317
0
        self.state.fetch_and(!OPEN_MASK, SeqCst);
1318
0
    }
1319
}
1320
1321
unsafe impl<T: Send> Send for UnboundedInner<T> {}
1322
unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1323
1324
unsafe impl<T: Send> Send for BoundedInner<T> {}
1325
unsafe impl<T: Send> Sync for BoundedInner<T> {}
1326
1327
impl State {
1328
0
    fn is_closed(&self) -> bool {
1329
0
        !self.is_open && self.num_messages == 0
1330
0
    }
1331
1332
0
    fn size_hint(&self) -> (usize, Option<usize>) {
1333
0
        if self.is_open {
1334
0
            (self.num_messages, None)
1335
        } else {
1336
0
            (self.num_messages, Some(self.num_messages))
1337
        }
1338
0
    }
1339
}
1340
1341
/*
1342
 *
1343
 * ===== Helpers =====
1344
 *
1345
 */
1346
1347
0
fn decode_state(num: usize) -> State {
1348
0
    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1349
0
}
1350
1351
0
fn encode_state(state: &State) -> usize {
1352
0
    let mut num = state.num_messages;
1353
1354
0
    if state.is_open {
1355
0
        num |= OPEN_MASK;
1356
0
    }
1357
1358
0
    num
1359
0
}