Coverage Report

Created: 2025-12-31 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-channel-0.3.21/src/mpsc/mod.rs
Line
Count
Source
1
//! A multi-producer, single-consumer queue for sending values across
2
//! asynchronous tasks.
3
//!
4
//! Similarly to the `std`, channel creation provides [`Receiver`] and
5
//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6
//! read values out of the channel. If there is no message to read from the
7
//! channel, the current task will be notified when a new value is sent.
8
//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9
//! the channel. If the channel is at capacity, the send will be rejected and
10
//! the task will be notified when additional capacity is available. In other
11
//! words, the channel provides backpressure.
12
//!
13
//! Unbounded channels are also available using the `unbounded` constructor.
14
//!
15
//! # Disconnection
16
//!
17
//! When all [`Sender`] handles have been dropped, it is no longer
18
//! possible to send values into the channel. This is considered the termination
19
//! event of the stream. As such, [`Receiver::poll_next`]
20
//! will return `Ok(Ready(None))`.
21
//!
22
//! If the [`Receiver`] handle is dropped, then messages can no longer
23
//! be read out of the channel. In this case, all further attempts to send will
24
//! result in an error.
25
//!
26
//! # Clean Shutdown
27
//!
28
//! If the [`Receiver`] is simply dropped, then it is possible for
29
//! there to be messages still in the channel that will not be processed. As
30
//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31
//! receiver will first call `close`, which will prevent any further messages to
32
//! be sent into the channel. Then, the receiver consumes the channel to
33
//! completion, at which point the receiver can be dropped.
34
//!
35
//! [`Sender`]: struct.Sender.html
36
//! [`Receiver`]: struct.Receiver.html
37
//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38
//! [`Receiver::poll_next`]:
39
//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41
// At the core, the channel uses an atomic FIFO queue for message passing. This
42
// queue is used as the primary coordination primitive. In order to enforce
43
// capacity limits and handle back pressure, a secondary FIFO queue is used to
44
// send parked task handles.
45
//
46
// The general idea is that the channel is created with a `buffer` size of `n`.
47
// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48
// slot to hold a message. This allows `Sender` to know for a fact that a send
49
// will succeed *before* starting to do the actual work of sending the value.
50
// Since most of this work is lock-free, once the work starts, it is impossible
51
// to safely revert.
52
//
53
// If the sender is unable to process a send operation, then the current
54
// task is parked and the handle is sent on the parked task queue.
55
//
56
// Note that the implementation guarantees that the channel capacity will never
57
// exceed the configured limit, however there is no *strict* guarantee that the
58
// receiver will wake up a parked task *immediately* when a slot becomes
59
// available. However, it will almost always unpark a task when a slot becomes
60
// available and it is *guaranteed* that a sender will be unparked when the
61
// message that caused the sender to become parked is read out of the channel.
62
//
63
// The steps for sending a message are roughly:
64
//
65
// 1) Increment the channel message count
66
// 2) If the channel is at capacity, push the task handle onto the wait queue
67
// 3) Push the message onto the message queue.
68
//
69
// The steps for receiving a message are roughly:
70
//
71
// 1) Pop a message from the message queue
72
// 2) Pop a task handle from the wait queue
73
// 3) Decrement the channel message count.
74
//
75
// It's important for the order of operations on lock-free structures to happen
76
// in reverse order between the sender and receiver. This makes the message
77
// queue the primary coordination structure and establishes the necessary
78
// happens-before semantics required for the acquire / release semantics used
79
// by the queue structure.
80
81
use futures_core::stream::{FusedStream, Stream};
82
use futures_core::task::__internal::AtomicWaker;
83
use futures_core::task::{Context, Poll, Waker};
84
use std::fmt;
85
use std::pin::Pin;
86
use std::sync::atomic::AtomicUsize;
87
use std::sync::atomic::Ordering::SeqCst;
88
use std::sync::{Arc, Mutex};
89
use std::thread;
90
91
use crate::mpsc::queue::Queue;
92
93
mod queue;
94
#[cfg(feature = "sink")]
95
mod sink_impl;
96
97
#[derive(Debug)]
98
struct UnboundedSenderInner<T> {
99
    // Channel state shared between the sender and receiver.
100
    inner: Arc<UnboundedInner<T>>,
101
}
102
103
#[derive(Debug)]
104
struct BoundedSenderInner<T> {
105
    // Channel state shared between the sender and receiver.
106
    inner: Arc<BoundedInner<T>>,
107
108
    // Handle to the task that is blocked on this sender. This handle is sent
109
    // to the receiver half in order to be notified when the sender becomes
110
    // unblocked.
111
    sender_task: Arc<Mutex<SenderTask>>,
112
113
    // `true` if the sender might be blocked. This is an optimization to avoid
114
    // having to lock the mutex most of the time.
115
    maybe_parked: bool,
116
}
117
118
// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119
impl<T> Unpin for UnboundedSenderInner<T> {}
120
impl<T> Unpin for BoundedSenderInner<T> {}
121
122
/// The transmission end of a bounded mpsc channel.
123
///
124
/// This value is created by the [`channel`](channel) function.
125
#[derive(Debug)]
126
pub struct Sender<T>(Option<BoundedSenderInner<T>>);
127
128
/// The transmission end of an unbounded mpsc channel.
129
///
130
/// This value is created by the [`unbounded`](unbounded) function.
131
#[derive(Debug)]
132
pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
133
134
trait AssertKinds: Send + Sync + Clone {}
135
impl AssertKinds for UnboundedSender<u32> {}
136
137
/// The receiving end of a bounded mpsc channel.
138
///
139
/// This value is created by the [`channel`](channel) function.
140
#[derive(Debug)]
141
pub struct Receiver<T> {
142
    inner: Option<Arc<BoundedInner<T>>>,
143
}
144
145
/// The receiving end of an unbounded mpsc channel.
146
///
147
/// This value is created by the [`unbounded`](unbounded) function.
148
#[derive(Debug)]
149
pub struct UnboundedReceiver<T> {
150
    inner: Option<Arc<UnboundedInner<T>>>,
151
}
152
153
// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
154
impl<T> Unpin for UnboundedReceiver<T> {}
155
156
/// The error type for [`Sender`s](Sender) used as `Sink`s.
157
#[derive(Clone, Debug, PartialEq, Eq)]
158
pub struct SendError {
159
    kind: SendErrorKind,
160
}
161
162
/// The error type returned from [`try_send`](Sender::try_send).
163
#[derive(Clone, PartialEq, Eq)]
164
pub struct TrySendError<T> {
165
    err: SendError,
166
    val: T,
167
}
168
169
#[derive(Clone, Debug, PartialEq, Eq)]
170
enum SendErrorKind {
171
    Full,
172
    Disconnected,
173
}
174
175
/// The error type returned from [`try_next`](Receiver::try_next).
176
pub struct TryRecvError {
177
    _priv: (),
178
}
179
180
impl fmt::Display for SendError {
181
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182
0
        if self.is_full() {
183
0
            write!(f, "send failed because channel is full")
184
        } else {
185
0
            write!(f, "send failed because receiver is gone")
186
        }
187
0
    }
188
}
189
190
impl std::error::Error for SendError {}
191
192
impl SendError {
193
    /// Returns `true` if this error is a result of the channel being full.
194
0
    pub fn is_full(&self) -> bool {
195
0
        match self.kind {
196
0
            SendErrorKind::Full => true,
197
0
            _ => false,
198
        }
199
0
    }
200
201
    /// Returns `true` if this error is a result of the receiver being dropped.
202
0
    pub fn is_disconnected(&self) -> bool {
203
0
        match self.kind {
204
0
            SendErrorKind::Disconnected => true,
205
0
            _ => false,
206
        }
207
0
    }
208
}
209
210
impl<T> fmt::Debug for TrySendError<T> {
211
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212
0
        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
213
0
    }
Unexecuted instantiation: <futures_channel::mpsc::TrySendError<devices::virtio::block::asynchronous::WorkerCmd> as core::fmt::Debug>::fmt
Unexecuted instantiation: <futures_channel::mpsc::TrySendError<_> as core::fmt::Debug>::fmt
214
}
215
216
impl<T> fmt::Display for TrySendError<T> {
217
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218
0
        if self.is_full() {
219
0
            write!(f, "send failed because channel is full")
220
        } else {
221
0
            write!(f, "send failed because receiver is gone")
222
        }
223
0
    }
224
}
225
226
impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
227
228
impl<T> TrySendError<T> {
229
    /// Returns `true` if this error is a result of the channel being full.
230
0
    pub fn is_full(&self) -> bool {
231
0
        self.err.is_full()
232
0
    }
233
234
    /// Returns `true` if this error is a result of the receiver being dropped.
235
0
    pub fn is_disconnected(&self) -> bool {
236
0
        self.err.is_disconnected()
237
0
    }
238
239
    /// Returns the message that was attempted to be sent but failed.
240
0
    pub fn into_inner(self) -> T {
241
0
        self.val
242
0
    }
243
244
    /// Drops the message and converts into a `SendError`.
245
0
    pub fn into_send_error(self) -> SendError {
246
0
        self.err
247
0
    }
248
}
249
250
impl fmt::Debug for TryRecvError {
251
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252
0
        f.debug_tuple("TryRecvError").finish()
253
0
    }
254
}
255
256
impl fmt::Display for TryRecvError {
257
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258
0
        write!(f, "receiver channel is empty")
259
0
    }
260
}
261
262
impl std::error::Error for TryRecvError {}
263
264
#[derive(Debug)]
265
struct UnboundedInner<T> {
266
    // Internal channel state. Consists of the number of messages stored in the
267
    // channel as well as a flag signalling that the channel is closed.
268
    state: AtomicUsize,
269
270
    // Atomic, FIFO queue used to send messages to the receiver
271
    message_queue: Queue<T>,
272
273
    // Number of senders in existence
274
    num_senders: AtomicUsize,
275
276
    // Handle to the receiver's task.
277
    recv_task: AtomicWaker,
278
}
279
280
#[derive(Debug)]
281
struct BoundedInner<T> {
282
    // Max buffer size of the channel. If `None` then the channel is unbounded.
283
    buffer: usize,
284
285
    // Internal channel state. Consists of the number of messages stored in the
286
    // channel as well as a flag signalling that the channel is closed.
287
    state: AtomicUsize,
288
289
    // Atomic, FIFO queue used to send messages to the receiver
290
    message_queue: Queue<T>,
291
292
    // Atomic, FIFO queue used to send parked task handles to the receiver.
293
    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
294
295
    // Number of senders in existence
296
    num_senders: AtomicUsize,
297
298
    // Handle to the receiver's task.
299
    recv_task: AtomicWaker,
300
}
301
302
// Struct representation of `Inner::state`.
303
#[derive(Debug, Clone, Copy)]
304
struct State {
305
    // `true` when the channel is open
306
    is_open: bool,
307
308
    // Number of messages in the channel
309
    num_messages: usize,
310
}
311
312
// The `is_open` flag is stored in the left-most bit of `Inner::state`
313
const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
314
315
// When a new channel is created, it is created in the open state with no
316
// pending messages.
317
const INIT_STATE: usize = OPEN_MASK;
318
319
// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
320
const MAX_CAPACITY: usize = !(OPEN_MASK);
321
322
// The maximum requested buffer size must be less than the maximum capacity of
323
// a channel. This is because each sender gets a guaranteed slot.
324
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
325
326
// Sent to the consumer to wake up blocked producers
327
#[derive(Debug)]
328
struct SenderTask {
329
    task: Option<Waker>,
330
    is_parked: bool,
331
}
332
333
impl SenderTask {
334
0
    fn new() -> Self {
335
0
        Self { task: None, is_parked: false }
336
0
    }
337
338
0
    fn notify(&mut self) {
339
0
        self.is_parked = false;
340
341
0
        if let Some(task) = self.task.take() {
342
0
            task.wake();
343
0
        }
344
0
    }
345
}
346
347
/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
348
///
349
/// Being bounded, this channel provides backpressure to ensure that the sender
350
/// outpaces the receiver by only a limited amount. The channel's capacity is
351
/// equal to `buffer + num-senders`. In other words, each sender gets a
352
/// guaranteed slot in the channel capacity, and on top of that there are
353
/// `buffer` "first come, first serve" slots available to all senders.
354
///
355
/// The [`Receiver`](Receiver) returned implements the
356
/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
357
/// `Sink`.
358
0
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
359
    // Check that the requested buffer size does not exceed the maximum buffer
360
    // size permitted by the system.
361
0
    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
362
363
0
    let inner = Arc::new(BoundedInner {
364
0
        buffer,
365
0
        state: AtomicUsize::new(INIT_STATE),
366
0
        message_queue: Queue::new(),
367
0
        parked_queue: Queue::new(),
368
0
        num_senders: AtomicUsize::new(1),
369
0
        recv_task: AtomicWaker::new(),
370
0
    });
371
372
0
    let tx = BoundedSenderInner {
373
0
        inner: inner.clone(),
374
0
        sender_task: Arc::new(Mutex::new(SenderTask::new())),
375
0
        maybe_parked: false,
376
0
    };
377
378
0
    let rx = Receiver { inner: Some(inner) };
379
380
0
    (Sender(Some(tx)), rx)
381
0
}
382
383
/// Creates an unbounded mpsc channel for communicating between asynchronous
384
/// tasks.
385
///
386
/// A `send` on this channel will always succeed as long as the receive half has
387
/// not been closed. If the receiver falls behind, messages will be arbitrarily
388
/// buffered.
389
///
390
/// **Note** that the amount of available system memory is an implicit bound to
391
/// the channel. Using an `unbounded` channel has the ability of causing the
392
/// process to run out of memory. In this case, the process will be aborted.
393
778
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
394
778
    let inner = Arc::new(UnboundedInner {
395
778
        state: AtomicUsize::new(INIT_STATE),
396
778
        message_queue: Queue::new(),
397
778
        num_senders: AtomicUsize::new(1),
398
778
        recv_task: AtomicWaker::new(),
399
778
    });
400
401
778
    let tx = UnboundedSenderInner { inner: inner.clone() };
402
403
778
    let rx = UnboundedReceiver { inner: Some(inner) };
404
405
778
    (UnboundedSender(Some(tx)), rx)
406
778
}
futures_channel::mpsc::unbounded::<devices::virtio::block::asynchronous::WorkerCmd>
Line
Count
Source
393
778
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
394
778
    let inner = Arc::new(UnboundedInner {
395
778
        state: AtomicUsize::new(INIT_STATE),
396
778
        message_queue: Queue::new(),
397
778
        num_senders: AtomicUsize::new(1),
398
778
        recv_task: AtomicWaker::new(),
399
778
    });
400
401
778
    let tx = UnboundedSenderInner { inner: inner.clone() };
402
403
778
    let rx = UnboundedReceiver { inner: Some(inner) };
404
405
778
    (UnboundedSender(Some(tx)), rx)
406
778
}
Unexecuted instantiation: futures_channel::mpsc::unbounded::<_>
407
408
/*
409
 *
410
 * ===== impl Sender =====
411
 *
412
 */
413
414
impl<T> UnboundedSenderInner<T> {
415
0
    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
416
0
        let state = decode_state(self.inner.state.load(SeqCst));
417
0
        if state.is_open {
418
0
            Poll::Ready(Ok(()))
419
        } else {
420
0
            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
421
        }
422
0
    }
423
424
    // Push message to the queue and signal to the receiver
425
778
    fn queue_push_and_signal(&self, msg: T) {
426
        // Push the message onto the message queue
427
778
        self.inner.message_queue.push(msg);
428
429
        // Signal to the receiver that a message has been enqueued. If the
430
        // receiver is parked, this will unpark the task.
431
778
        self.inner.recv_task.wake();
432
778
    }
<futures_channel::mpsc::UnboundedSenderInner<devices::virtio::block::asynchronous::WorkerCmd>>::queue_push_and_signal
Line
Count
Source
425
778
    fn queue_push_and_signal(&self, msg: T) {
426
        // Push the message onto the message queue
427
778
        self.inner.message_queue.push(msg);
428
429
        // Signal to the receiver that a message has been enqueued. If the
430
        // receiver is parked, this will unpark the task.
431
778
        self.inner.recv_task.wake();
432
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::queue_push_and_signal
433
434
    // Increment the number of queued messages. Returns the resulting number.
435
778
    fn inc_num_messages(&self) -> Option<usize> {
436
778
        let mut curr = self.inner.state.load(SeqCst);
437
438
        loop {
439
778
            let mut state = decode_state(curr);
440
441
            // The receiver end closed the channel.
442
778
            if !state.is_open {
443
0
                return None;
444
778
            }
445
446
            // This probably is never hit? Odds are the process will run out of
447
            // memory first. It may be worth to return something else in this
448
            // case?
449
778
            assert!(
450
778
                state.num_messages < MAX_CAPACITY,
451
                "buffer space \
452
                    exhausted; sending this messages would overflow the state"
453
            );
454
455
778
            state.num_messages += 1;
456
457
778
            let next = encode_state(&state);
458
778
            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
459
778
                Ok(_) => return Some(state.num_messages),
460
0
                Err(actual) => curr = actual,
461
            }
462
        }
463
778
    }
<futures_channel::mpsc::UnboundedSenderInner<devices::virtio::block::asynchronous::WorkerCmd>>::inc_num_messages
Line
Count
Source
435
778
    fn inc_num_messages(&self) -> Option<usize> {
436
778
        let mut curr = self.inner.state.load(SeqCst);
437
438
        loop {
439
778
            let mut state = decode_state(curr);
440
441
            // The receiver end closed the channel.
442
778
            if !state.is_open {
443
0
                return None;
444
778
            }
445
446
            // This probably is never hit? Odds are the process will run out of
447
            // memory first. It may be worth to return something else in this
448
            // case?
449
778
            assert!(
450
778
                state.num_messages < MAX_CAPACITY,
451
                "buffer space \
452
                    exhausted; sending this messages would overflow the state"
453
            );
454
455
778
            state.num_messages += 1;
456
457
778
            let next = encode_state(&state);
458
778
            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
459
778
                Ok(_) => return Some(state.num_messages),
460
0
                Err(actual) => curr = actual,
461
            }
462
        }
463
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::inc_num_messages
464
465
    /// Returns whether the senders send to the same receiver.
466
0
    fn same_receiver(&self, other: &Self) -> bool {
467
0
        Arc::ptr_eq(&self.inner, &other.inner)
468
0
    }
469
470
    /// Returns whether the sender send to this receiver.
471
0
    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
472
0
        Arc::ptr_eq(&self.inner, inner)
473
0
    }
474
475
    /// Returns pointer to the Arc containing sender
476
    ///
477
    /// The returned pointer is not referenced and should be only used for hashing!
478
0
    fn ptr(&self) -> *const UnboundedInner<T> {
479
0
        &*self.inner
480
0
    }
481
482
    /// Returns whether this channel is closed without needing a context.
483
0
    fn is_closed(&self) -> bool {
484
0
        !decode_state(self.inner.state.load(SeqCst)).is_open
485
0
    }
486
487
    /// Closes this channel from the sender side, preventing any new messages.
488
778
    fn close_channel(&self) {
489
        // There's no need to park this sender, its dropping,
490
        // and we don't want to check for capacity, so skip
491
        // that stuff from `do_send`.
492
493
778
        self.inner.set_closed();
494
778
        self.inner.recv_task.wake();
495
778
    }
<futures_channel::mpsc::UnboundedSenderInner<devices::virtio::block::asynchronous::WorkerCmd>>::close_channel
Line
Count
Source
488
778
    fn close_channel(&self) {
489
        // There's no need to park this sender, its dropping,
490
        // and we don't want to check for capacity, so skip
491
        // that stuff from `do_send`.
492
493
778
        self.inner.set_closed();
494
778
        self.inner.recv_task.wake();
495
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::close_channel
496
}
497
498
impl<T> BoundedSenderInner<T> {
499
    /// Attempts to send a message on this `Sender`, returning the message
500
    /// if there was an error.
501
0
    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
502
        // If the sender is currently blocked, reject the message
503
0
        if !self.poll_unparked(None).is_ready() {
504
0
            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
505
0
        }
506
507
        // The channel has capacity to accept the message, so send it
508
0
        self.do_send_b(msg)
509
0
    }
510
511
    // Do the send without failing.
512
    // Can be called only by bounded sender.
513
0
    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
514
        // Anyone calling do_send *should* make sure there is room first,
515
        // but assert here for tests as a sanity check.
516
0
        debug_assert!(self.poll_unparked(None).is_ready());
517
518
        // First, increment the number of messages contained by the channel.
519
        // This operation will also atomically determine if the sender task
520
        // should be parked.
521
        //
522
        // `None` is returned in the case that the channel has been closed by the
523
        // receiver. This happens when `Receiver::close` is called or the
524
        // receiver is dropped.
525
0
        let park_self = match self.inc_num_messages() {
526
0
            Some(num_messages) => {
527
                // Block if the current number of pending messages has exceeded
528
                // the configured buffer size
529
0
                num_messages > self.inner.buffer
530
            }
531
            None => {
532
0
                return Err(TrySendError {
533
0
                    err: SendError { kind: SendErrorKind::Disconnected },
534
0
                    val: msg,
535
0
                })
536
            }
537
        };
538
539
        // If the channel has reached capacity, then the sender task needs to
540
        // be parked. This will send the task handle on the parked task queue.
541
        //
542
        // However, when `do_send` is called while dropping the `Sender`,
543
        // `task::current()` can't be called safely. In this case, in order to
544
        // maintain internal consistency, a blank message is pushed onto the
545
        // parked task queue.
546
0
        if park_self {
547
0
            self.park();
548
0
        }
549
550
0
        self.queue_push_and_signal(msg);
551
552
0
        Ok(())
553
0
    }
554
555
    // Push message to the queue and signal to the receiver
556
0
    fn queue_push_and_signal(&self, msg: T) {
557
        // Push the message onto the message queue
558
0
        self.inner.message_queue.push(msg);
559
560
        // Signal to the receiver that a message has been enqueued. If the
561
        // receiver is parked, this will unpark the task.
562
0
        self.inner.recv_task.wake();
563
0
    }
564
565
    // Increment the number of queued messages. Returns the resulting number.
566
0
    fn inc_num_messages(&self) -> Option<usize> {
567
0
        let mut curr = self.inner.state.load(SeqCst);
568
569
        loop {
570
0
            let mut state = decode_state(curr);
571
572
            // The receiver end closed the channel.
573
0
            if !state.is_open {
574
0
                return None;
575
0
            }
576
577
            // This probably is never hit? Odds are the process will run out of
578
            // memory first. It may be worth to return something else in this
579
            // case?
580
0
            assert!(
581
0
                state.num_messages < MAX_CAPACITY,
582
                "buffer space \
583
                    exhausted; sending this messages would overflow the state"
584
            );
585
586
0
            state.num_messages += 1;
587
588
0
            let next = encode_state(&state);
589
0
            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
590
0
                Ok(_) => return Some(state.num_messages),
591
0
                Err(actual) => curr = actual,
592
            }
593
        }
594
0
    }
595
596
0
    fn park(&mut self) {
597
0
        {
598
0
            let mut sender = self.sender_task.lock().unwrap();
599
0
            sender.task = None;
600
0
            sender.is_parked = true;
601
0
        }
602
603
        // Send handle over queue
604
0
        let t = self.sender_task.clone();
605
0
        self.inner.parked_queue.push(t);
606
607
        // Check to make sure we weren't closed after we sent our task on the
608
        // queue
609
0
        let state = decode_state(self.inner.state.load(SeqCst));
610
0
        self.maybe_parked = state.is_open;
611
0
    }
612
613
    /// Polls the channel to determine if there is guaranteed capacity to send
614
    /// at least one item without waiting.
615
    ///
616
    /// # Return value
617
    ///
618
    /// This method returns:
619
    ///
620
    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
621
    /// - `Poll::Pending` if the channel may not have
622
    ///   capacity, in which case the current task is queued to be notified once
623
    ///   capacity is available;
624
    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
625
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
626
0
        let state = decode_state(self.inner.state.load(SeqCst));
627
0
        if !state.is_open {
628
0
            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
629
0
        }
630
631
0
        self.poll_unparked(Some(cx)).map(Ok)
632
0
    }
633
634
    /// Returns whether the senders send to the same receiver.
635
0
    fn same_receiver(&self, other: &Self) -> bool {
636
0
        Arc::ptr_eq(&self.inner, &other.inner)
637
0
    }
638
639
    /// Returns whether the sender send to this receiver.
640
0
    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
641
0
        Arc::ptr_eq(&self.inner, receiver)
642
0
    }
643
644
    /// Returns pointer to the Arc containing sender
645
    ///
646
    /// The returned pointer is not referenced and should be only used for hashing!
647
0
    fn ptr(&self) -> *const BoundedInner<T> {
648
0
        &*self.inner
649
0
    }
650
651
    /// Returns whether this channel is closed without needing a context.
652
0
    fn is_closed(&self) -> bool {
653
0
        !decode_state(self.inner.state.load(SeqCst)).is_open
654
0
    }
655
656
    /// Closes this channel from the sender side, preventing any new messages.
657
0
    fn close_channel(&self) {
658
        // There's no need to park this sender, its dropping,
659
        // and we don't want to check for capacity, so skip
660
        // that stuff from `do_send`.
661
662
0
        self.inner.set_closed();
663
0
        self.inner.recv_task.wake();
664
0
    }
665
666
0
    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
667
        // First check the `maybe_parked` variable. This avoids acquiring the
668
        // lock in most cases
669
0
        if self.maybe_parked {
670
            // Get a lock on the task handle
671
0
            let mut task = self.sender_task.lock().unwrap();
672
673
0
            if !task.is_parked {
674
0
                self.maybe_parked = false;
675
0
                return Poll::Ready(());
676
0
            }
677
678
            // At this point, an unpark request is pending, so there will be an
679
            // unpark sometime in the future. We just need to make sure that
680
            // the correct task will be notified.
681
            //
682
            // Update the task in case the `Sender` has been moved to another
683
            // task
684
0
            task.task = cx.map(|cx| cx.waker().clone());
685
686
0
            Poll::Pending
687
        } else {
688
0
            Poll::Ready(())
689
        }
690
0
    }
691
}
692
693
impl<T> Sender<T> {
694
    /// Attempts to send a message on this `Sender`, returning the message
695
    /// if there was an error.
696
0
    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
697
0
        if let Some(inner) = &mut self.0 {
698
0
            inner.try_send(msg)
699
        } else {
700
0
            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
701
        }
702
0
    }
703
704
    /// Send a message on the channel.
705
    ///
706
    /// This function should only be called after
707
    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
708
    /// ready to receive a message.
709
0
    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
710
0
        self.try_send(msg).map_err(|e| e.err)
711
0
    }
712
713
    /// Polls the channel to determine if there is guaranteed capacity to send
714
    /// at least one item without waiting.
715
    ///
716
    /// # Return value
717
    ///
718
    /// This method returns:
719
    ///
720
    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
721
    /// - `Poll::Pending` if the channel may not have
722
    ///   capacity, in which case the current task is queued to be notified once
723
    ///   capacity is available;
724
    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
725
0
    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
726
0
        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
727
0
        inner.poll_ready(cx)
728
0
    }
729
730
    /// Returns whether this channel is closed without needing a context.
731
0
    pub fn is_closed(&self) -> bool {
732
0
        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
733
0
    }
734
735
    /// Closes this channel from the sender side, preventing any new messages.
736
0
    pub fn close_channel(&mut self) {
737
0
        if let Some(inner) = &mut self.0 {
738
0
            inner.close_channel();
739
0
        }
740
0
    }
741
742
    /// Disconnects this sender from the channel, closing it if there are no more senders left.
743
0
    pub fn disconnect(&mut self) {
744
0
        self.0 = None;
745
0
    }
746
747
    /// Returns whether the senders send to the same receiver.
748
0
    pub fn same_receiver(&self, other: &Self) -> bool {
749
0
        match (&self.0, &other.0) {
750
0
            (Some(inner), Some(other)) => inner.same_receiver(other),
751
0
            _ => false,
752
        }
753
0
    }
754
755
    /// Returns whether the sender send to this receiver.
756
0
    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
757
0
        match (&self.0, &receiver.inner) {
758
0
            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
759
0
            _ => false,
760
        }
761
0
    }
762
763
    /// Hashes the receiver into the provided hasher
764
0
    pub fn hash_receiver<H>(&self, hasher: &mut H)
765
0
    where
766
0
        H: std::hash::Hasher,
767
    {
768
        use std::hash::Hash;
769
770
0
        let ptr = self.0.as_ref().map(|inner| inner.ptr());
771
0
        ptr.hash(hasher);
772
0
    }
773
}
774
775
impl<T> UnboundedSender<T> {
776
    /// Check if the channel is ready to receive a message.
777
0
    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
778
0
        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
779
0
        inner.poll_ready_nb()
780
0
    }
781
782
    /// Returns whether this channel is closed without needing a context.
783
0
    pub fn is_closed(&self) -> bool {
784
0
        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
785
0
    }
786
787
    /// Closes this channel from the sender side, preventing any new messages.
788
0
    pub fn close_channel(&self) {
789
0
        if let Some(inner) = &self.0 {
790
0
            inner.close_channel();
791
0
        }
792
0
    }
793
794
    /// Disconnects this sender from the channel, closing it if there are no more senders left.
795
0
    pub fn disconnect(&mut self) {
796
0
        self.0 = None;
797
0
    }
798
799
    // Do the send without parking current task.
800
778
    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
801
778
        if let Some(inner) = &self.0 {
802
778
            if inner.inc_num_messages().is_some() {
803
778
                inner.queue_push_and_signal(msg);
804
778
                return Ok(());
805
0
            }
806
0
        }
807
808
0
        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
809
778
    }
<futures_channel::mpsc::UnboundedSender<devices::virtio::block::asynchronous::WorkerCmd>>::do_send_nb
Line
Count
Source
800
778
    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
801
778
        if let Some(inner) = &self.0 {
802
778
            if inner.inc_num_messages().is_some() {
803
778
                inner.queue_push_and_signal(msg);
804
778
                return Ok(());
805
0
            }
806
0
        }
807
808
0
        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
809
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::do_send_nb
810
811
    /// Send a message on the channel.
812
    ///
813
    /// This method should only be called after `poll_ready` has been used to
814
    /// verify that the channel is ready to receive a message.
815
0
    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
816
0
        self.do_send_nb(msg).map_err(|e| e.err)
817
0
    }
818
819
    /// Sends a message along this channel.
820
    ///
821
    /// This is an unbounded sender, so this function differs from `Sink::send`
822
    /// by ensuring the return type reflects that the channel is always ready to
823
    /// receive messages.
824
778
    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
825
778
        self.do_send_nb(msg)
826
778
    }
<futures_channel::mpsc::UnboundedSender<devices::virtio::block::asynchronous::WorkerCmd>>::unbounded_send
Line
Count
Source
824
778
    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
825
778
        self.do_send_nb(msg)
826
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::unbounded_send
827
828
    /// Returns whether the senders send to the same receiver.
829
0
    pub fn same_receiver(&self, other: &Self) -> bool {
830
0
        match (&self.0, &other.0) {
831
0
            (Some(inner), Some(other)) => inner.same_receiver(other),
832
0
            _ => false,
833
        }
834
0
    }
835
836
    /// Returns whether the sender send to this receiver.
837
0
    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
838
0
        match (&self.0, &receiver.inner) {
839
0
            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
840
0
            _ => false,
841
        }
842
0
    }
843
844
    /// Hashes the receiver into the provided hasher
845
0
    pub fn hash_receiver<H>(&self, hasher: &mut H)
846
0
    where
847
0
        H: std::hash::Hasher,
848
    {
849
        use std::hash::Hash;
850
851
0
        let ptr = self.0.as_ref().map(|inner| inner.ptr());
852
0
        ptr.hash(hasher);
853
0
    }
854
}
855
856
impl<T> Clone for Sender<T> {
857
0
    fn clone(&self) -> Self {
858
0
        Self(self.0.clone())
859
0
    }
860
}
861
862
impl<T> Clone for UnboundedSender<T> {
863
0
    fn clone(&self) -> Self {
864
0
        Self(self.0.clone())
865
0
    }
866
}
867
868
impl<T> Clone for UnboundedSenderInner<T> {
869
0
    fn clone(&self) -> Self {
870
        // Since this atomic op isn't actually guarding any memory and we don't
871
        // care about any orderings besides the ordering on the single atomic
872
        // variable, a relaxed ordering is acceptable.
873
0
        let mut curr = self.inner.num_senders.load(SeqCst);
874
875
        loop {
876
            // If the maximum number of senders has been reached, then fail
877
0
            if curr == MAX_BUFFER {
878
0
                panic!("cannot clone `Sender` -- too many outstanding senders");
879
0
            }
880
881
0
            debug_assert!(curr < MAX_BUFFER);
882
883
0
            let next = curr + 1;
884
0
            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
885
                Ok(_) => {
886
                    // The ABA problem doesn't matter here. We only care that the
887
                    // number of senders never exceeds the maximum.
888
0
                    return Self { inner: self.inner.clone() };
889
                }
890
0
                Err(actual) => curr = actual,
891
            }
892
        }
893
0
    }
894
}
895
896
impl<T> Clone for BoundedSenderInner<T> {
897
0
    fn clone(&self) -> Self {
898
        // Since this atomic op isn't actually guarding any memory and we don't
899
        // care about any orderings besides the ordering on the single atomic
900
        // variable, a relaxed ordering is acceptable.
901
0
        let mut curr = self.inner.num_senders.load(SeqCst);
902
903
        loop {
904
            // If the maximum number of senders has been reached, then fail
905
0
            if curr == self.inner.max_senders() {
906
0
                panic!("cannot clone `Sender` -- too many outstanding senders");
907
0
            }
908
909
0
            debug_assert!(curr < self.inner.max_senders());
910
911
0
            let next = curr + 1;
912
0
            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
913
                Ok(_) => {
914
                    // The ABA problem doesn't matter here. We only care that the
915
                    // number of senders never exceeds the maximum.
916
0
                    return Self {
917
0
                        inner: self.inner.clone(),
918
0
                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
919
0
                        maybe_parked: false,
920
0
                    };
921
                }
922
0
                Err(actual) => curr = actual,
923
            }
924
        }
925
0
    }
926
}
927
928
impl<T> Drop for UnboundedSenderInner<T> {
929
778
    fn drop(&mut self) {
930
        // Ordering between variables don't matter here
931
778
        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
932
933
778
        if prev == 1 {
934
778
            self.close_channel();
935
778
        }
936
778
    }
<futures_channel::mpsc::UnboundedSenderInner<devices::virtio::block::asynchronous::WorkerCmd> as core::ops::drop::Drop>::drop
Line
Count
Source
929
778
    fn drop(&mut self) {
930
        // Ordering between variables don't matter here
931
778
        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
932
933
778
        if prev == 1 {
934
778
            self.close_channel();
935
778
        }
936
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_> as core::ops::drop::Drop>::drop
937
}
938
939
impl<T> Drop for BoundedSenderInner<T> {
940
0
    fn drop(&mut self) {
941
        // Ordering between variables don't matter here
942
0
        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
943
944
0
        if prev == 1 {
945
0
            self.close_channel();
946
0
        }
947
0
    }
948
}
949
950
/*
951
 *
952
 * ===== impl Receiver =====
953
 *
954
 */
955
956
impl<T> Receiver<T> {
957
    /// Closes the receiving half of a channel, without dropping it.
958
    ///
959
    /// This prevents any further messages from being sent on the channel while
960
    /// still enabling the receiver to drain messages that are buffered.
961
0
    pub fn close(&mut self) {
962
0
        if let Some(inner) = &mut self.inner {
963
0
            inner.set_closed();
964
965
            // Wake up any threads waiting as they'll see that we've closed the
966
            // channel and will continue on their merry way.
967
0
            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
968
0
                task.lock().unwrap().notify();
969
0
            }
970
0
        }
971
0
    }
972
973
    /// Tries to receive the next message without notifying a context if empty.
974
    ///
975
    /// It is not recommended to call this function from inside of a future,
976
    /// only when you've otherwise arranged to be notified when the channel is
977
    /// no longer empty.
978
    ///
979
    /// This function returns:
980
    /// * `Ok(Some(t))` when message is fetched
981
    /// * `Ok(None)` when channel is closed and no messages left in the queue
982
    /// * `Err(e)` when there are no messages available, but channel is not yet closed
983
0
    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
984
0
        match self.next_message() {
985
0
            Poll::Ready(msg) => Ok(msg),
986
0
            Poll::Pending => Err(TryRecvError { _priv: () }),
987
        }
988
0
    }
989
990
0
    fn next_message(&mut self) -> Poll<Option<T>> {
991
0
        let inner = match self.inner.as_mut() {
992
0
            None => return Poll::Ready(None),
993
0
            Some(inner) => inner,
994
        };
995
        // Pop off a message
996
0
        match unsafe { inner.message_queue.pop_spin() } {
997
0
            Some(msg) => {
998
                // If there are any parked task handles in the parked queue,
999
                // pop one and unpark it.
1000
0
                self.unpark_one();
1001
1002
                // Decrement number of messages
1003
0
                self.dec_num_messages();
1004
1005
0
                Poll::Ready(Some(msg))
1006
            }
1007
            None => {
1008
0
                let state = decode_state(inner.state.load(SeqCst));
1009
0
                if state.is_closed() {
1010
                    // If closed flag is set AND there are no pending messages
1011
                    // it means end of stream
1012
0
                    self.inner = None;
1013
0
                    Poll::Ready(None)
1014
                } else {
1015
                    // If queue is open, we need to return Pending
1016
                    // to be woken up when new messages arrive.
1017
                    // If queue is closed but num_messages is non-zero,
1018
                    // it means that senders updated the state,
1019
                    // but didn't put message to queue yet,
1020
                    // so we need to park until sender unparks the task
1021
                    // after queueing the message.
1022
0
                    Poll::Pending
1023
                }
1024
            }
1025
        }
1026
0
    }
1027
1028
    // Unpark a single task handle if there is one pending in the parked queue
1029
0
    fn unpark_one(&mut self) {
1030
0
        if let Some(inner) = &mut self.inner {
1031
0
            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1032
0
                task.lock().unwrap().notify();
1033
0
            }
1034
0
        }
1035
0
    }
1036
1037
0
    fn dec_num_messages(&self) {
1038
0
        if let Some(inner) = &self.inner {
1039
0
            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1040
0
            // unless there's underflow, and we know there's no underflow
1041
0
            // because number of messages at this point is always > 0.
1042
0
            inner.state.fetch_sub(1, SeqCst);
1043
0
        }
1044
0
    }
1045
}
1046
1047
// The receiver does not ever take a Pin to the inner T
1048
impl<T> Unpin for Receiver<T> {}
1049
1050
impl<T> FusedStream for Receiver<T> {
1051
0
    fn is_terminated(&self) -> bool {
1052
0
        self.inner.is_none()
1053
0
    }
1054
}
1055
1056
impl<T> Stream for Receiver<T> {
1057
    type Item = T;
1058
1059
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1060
        // Try to read a message off of the message queue.
1061
0
        match self.next_message() {
1062
0
            Poll::Ready(msg) => {
1063
0
                if msg.is_none() {
1064
0
                    self.inner = None;
1065
0
                }
1066
0
                Poll::Ready(msg)
1067
            }
1068
            Poll::Pending => {
1069
                // There are no messages to read, in this case, park.
1070
0
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1071
                // Check queue again after parking to prevent race condition:
1072
                // a message could be added to the queue after previous `next_message`
1073
                // before `register` call.
1074
0
                self.next_message()
1075
            }
1076
        }
1077
0
    }
1078
}
1079
1080
impl<T> Drop for Receiver<T> {
1081
0
    fn drop(&mut self) {
1082
        // Drain the channel of all pending messages
1083
0
        self.close();
1084
0
        if self.inner.is_some() {
1085
            loop {
1086
0
                match self.next_message() {
1087
0
                    Poll::Ready(Some(_)) => {}
1088
0
                    Poll::Ready(None) => break,
1089
                    Poll::Pending => {
1090
0
                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1091
1092
                        // If the channel is closed, then there is no need to park.
1093
0
                        if state.is_closed() {
1094
0
                            break;
1095
0
                        }
1096
1097
                        // TODO: Spinning isn't ideal, it might be worth
1098
                        // investigating using a condvar or some other strategy
1099
                        // here. That said, if this case is hit, then another thread
1100
                        // is about to push the value into the queue and this isn't
1101
                        // the only spinlock in the impl right now.
1102
0
                        thread::yield_now();
1103
                    }
1104
                }
1105
            }
1106
0
        }
1107
0
    }
1108
}
1109
1110
impl<T> UnboundedReceiver<T> {
1111
    /// Closes the receiving half of a channel, without dropping it.
1112
    ///
1113
    /// This prevents any further messages from being sent on the channel while
1114
    /// still enabling the receiver to drain messages that are buffered.
1115
778
    pub fn close(&mut self) {
1116
778
        if let Some(inner) = &mut self.inner {
1117
778
            inner.set_closed();
1118
778
        }
1119
778
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd>>::close
Line
Count
Source
1115
778
    pub fn close(&mut self) {
1116
778
        if let Some(inner) = &mut self.inner {
1117
778
            inner.set_closed();
1118
778
        }
1119
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_>>::close
1120
1121
    /// Tries to receive the next message without notifying a context if empty.
1122
    ///
1123
    /// It is not recommended to call this function from inside of a future,
1124
    /// only when you've otherwise arranged to be notified when the channel is
1125
    /// no longer empty.
1126
    ///
1127
    /// This function returns:
1128
    /// * `Ok(Some(t))` when message is fetched
1129
    /// * `Ok(None)` when channel is closed and no messages left in the queue
1130
    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1131
0
    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1132
0
        match self.next_message() {
1133
0
            Poll::Ready(msg) => Ok(msg),
1134
0
            Poll::Pending => Err(TryRecvError { _priv: () }),
1135
        }
1136
0
    }
1137
1138
1.95k
    fn next_message(&mut self) -> Poll<Option<T>> {
1139
1.95k
        let inner = match self.inner.as_mut() {
1140
0
            None => return Poll::Ready(None),
1141
1.95k
            Some(inner) => inner,
1142
        };
1143
        // Pop off a message
1144
1.95k
        match unsafe { inner.message_queue.pop_spin() } {
1145
778
            Some(msg) => {
1146
                // Decrement number of messages
1147
778
                self.dec_num_messages();
1148
1149
778
                Poll::Ready(Some(msg))
1150
            }
1151
            None => {
1152
1.17k
                let state = decode_state(inner.state.load(SeqCst));
1153
1.17k
                if state.is_closed() {
1154
                    // If closed flag is set AND there are no pending messages
1155
                    // it means end of stream
1156
778
                    self.inner = None;
1157
778
                    Poll::Ready(None)
1158
                } else {
1159
                    // If queue is open, we need to return Pending
1160
                    // to be woken up when new messages arrive.
1161
                    // If queue is closed but num_messages is non-zero,
1162
                    // it means that senders updated the state,
1163
                    // but didn't put message to queue yet,
1164
                    // so we need to park until sender unparks the task
1165
                    // after queueing the message.
1166
400
                    Poll::Pending
1167
                }
1168
            }
1169
        }
1170
1.95k
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd>>::next_message
Line
Count
Source
1138
1.95k
    fn next_message(&mut self) -> Poll<Option<T>> {
1139
1.95k
        let inner = match self.inner.as_mut() {
1140
0
            None => return Poll::Ready(None),
1141
1.95k
            Some(inner) => inner,
1142
        };
1143
        // Pop off a message
1144
1.95k
        match unsafe { inner.message_queue.pop_spin() } {
1145
778
            Some(msg) => {
1146
                // Decrement number of messages
1147
778
                self.dec_num_messages();
1148
1149
778
                Poll::Ready(Some(msg))
1150
            }
1151
            None => {
1152
1.17k
                let state = decode_state(inner.state.load(SeqCst));
1153
1.17k
                if state.is_closed() {
1154
                    // If closed flag is set AND there are no pending messages
1155
                    // it means end of stream
1156
778
                    self.inner = None;
1157
778
                    Poll::Ready(None)
1158
                } else {
1159
                    // If queue is open, we need to return Pending
1160
                    // to be woken up when new messages arrive.
1161
                    // If queue is closed but num_messages is non-zero,
1162
                    // it means that senders updated the state,
1163
                    // but didn't put message to queue yet,
1164
                    // so we need to park until sender unparks the task
1165
                    // after queueing the message.
1166
400
                    Poll::Pending
1167
                }
1168
            }
1169
        }
1170
1.95k
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_>>::next_message
1171
1172
778
    fn dec_num_messages(&self) {
1173
778
        if let Some(inner) = &self.inner {
1174
778
            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1175
778
            // unless there's underflow, and we know there's no underflow
1176
778
            // because number of messages at this point is always > 0.
1177
778
            inner.state.fetch_sub(1, SeqCst);
1178
778
        }
1179
778
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd>>::dec_num_messages
Line
Count
Source
1172
778
    fn dec_num_messages(&self) {
1173
778
        if let Some(inner) = &self.inner {
1174
778
            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1175
778
            // unless there's underflow, and we know there's no underflow
1176
778
            // because number of messages at this point is always > 0.
1177
778
            inner.state.fetch_sub(1, SeqCst);
1178
778
        }
1179
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_>>::dec_num_messages
1180
}
1181
1182
impl<T> FusedStream for UnboundedReceiver<T> {
1183
581
    fn is_terminated(&self) -> bool {
1184
581
        self.inner.is_none()
1185
581
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd> as futures_core::stream::FusedStream>::is_terminated
Line
Count
Source
1183
581
    fn is_terminated(&self) -> bool {
1184
581
        self.inner.is_none()
1185
581
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_> as futures_core::stream::FusedStream>::is_terminated
1186
}
1187
1188
impl<T> Stream for UnboundedReceiver<T> {
1189
    type Item = T;
1190
1191
581
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1192
        // Try to read a message off of the message queue.
1193
581
        match self.next_message() {
1194
381
            Poll::Ready(msg) => {
1195
381
                if msg.is_none() {
1196
0
                    self.inner = None;
1197
381
                }
1198
381
                Poll::Ready(msg)
1199
            }
1200
            Poll::Pending => {
1201
                // There are no messages to read, in this case, park.
1202
200
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1203
                // Check queue again after parking to prevent race condition:
1204
                // a message could be added to the queue after previous `next_message`
1205
                // before `register` call.
1206
200
                self.next_message()
1207
            }
1208
        }
1209
581
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd> as futures_core::stream::Stream>::poll_next
Line
Count
Source
1191
581
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1192
        // Try to read a message off of the message queue.
1193
581
        match self.next_message() {
1194
381
            Poll::Ready(msg) => {
1195
381
                if msg.is_none() {
1196
0
                    self.inner = None;
1197
381
                }
1198
381
                Poll::Ready(msg)
1199
            }
1200
            Poll::Pending => {
1201
                // There are no messages to read, in this case, park.
1202
200
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1203
                // Check queue again after parking to prevent race condition:
1204
                // a message could be added to the queue after previous `next_message`
1205
                // before `register` call.
1206
200
                self.next_message()
1207
            }
1208
        }
1209
581
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_> as futures_core::stream::Stream>::poll_next
1210
}
1211
1212
impl<T> Drop for UnboundedReceiver<T> {
1213
778
    fn drop(&mut self) {
1214
        // Drain the channel of all pending messages
1215
778
        self.close();
1216
778
        if self.inner.is_some() {
1217
            loop {
1218
1.17k
                match self.next_message() {
1219
397
                    Poll::Ready(Some(_)) => {}
1220
778
                    Poll::Ready(None) => break,
1221
                    Poll::Pending => {
1222
0
                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1223
1224
                        // If the channel is closed, then there is no need to park.
1225
0
                        if state.is_closed() {
1226
0
                            break;
1227
0
                        }
1228
1229
                        // TODO: Spinning isn't ideal, it might be worth
1230
                        // investigating using a condvar or some other strategy
1231
                        // here. That said, if this case is hit, then another thread
1232
                        // is about to push the value into the queue and this isn't
1233
                        // the only spinlock in the impl right now.
1234
0
                        thread::yield_now();
1235
                    }
1236
                }
1237
            }
1238
0
        }
1239
778
    }
<futures_channel::mpsc::UnboundedReceiver<devices::virtio::block::asynchronous::WorkerCmd> as core::ops::drop::Drop>::drop
Line
Count
Source
1213
778
    fn drop(&mut self) {
1214
        // Drain the channel of all pending messages
1215
778
        self.close();
1216
778
        if self.inner.is_some() {
1217
            loop {
1218
1.17k
                match self.next_message() {
1219
397
                    Poll::Ready(Some(_)) => {}
1220
778
                    Poll::Ready(None) => break,
1221
                    Poll::Pending => {
1222
0
                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1223
1224
                        // If the channel is closed, then there is no need to park.
1225
0
                        if state.is_closed() {
1226
0
                            break;
1227
0
                        }
1228
1229
                        // TODO: Spinning isn't ideal, it might be worth
1230
                        // investigating using a condvar or some other strategy
1231
                        // here. That said, if this case is hit, then another thread
1232
                        // is about to push the value into the queue and this isn't
1233
                        // the only spinlock in the impl right now.
1234
0
                        thread::yield_now();
1235
                    }
1236
                }
1237
            }
1238
0
        }
1239
778
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedReceiver<_> as core::ops::drop::Drop>::drop
1240
}
1241
1242
/*
1243
 *
1244
 * ===== impl Inner =====
1245
 *
1246
 */
1247
1248
impl<T> UnboundedInner<T> {
1249
    // Clear `open` flag in the state, keep `num_messages` intact.
1250
1.55k
    fn set_closed(&self) {
1251
1.55k
        let curr = self.state.load(SeqCst);
1252
1.55k
        if !decode_state(curr).is_open {
1253
778
            return;
1254
778
        }
1255
1256
778
        self.state.fetch_and(!OPEN_MASK, SeqCst);
1257
1.55k
    }
<futures_channel::mpsc::UnboundedInner<devices::virtio::block::asynchronous::WorkerCmd>>::set_closed
Line
Count
Source
1250
1.55k
    fn set_closed(&self) {
1251
1.55k
        let curr = self.state.load(SeqCst);
1252
1.55k
        if !decode_state(curr).is_open {
1253
778
            return;
1254
778
        }
1255
1256
778
        self.state.fetch_and(!OPEN_MASK, SeqCst);
1257
1.55k
    }
Unexecuted instantiation: <futures_channel::mpsc::UnboundedInner<_>>::set_closed
1258
}
1259
1260
impl<T> BoundedInner<T> {
1261
    // The return value is such that the total number of messages that can be
1262
    // enqueued into the channel will never exceed MAX_CAPACITY
1263
0
    fn max_senders(&self) -> usize {
1264
0
        MAX_CAPACITY - self.buffer
1265
0
    }
1266
1267
    // Clear `open` flag in the state, keep `num_messages` intact.
1268
0
    fn set_closed(&self) {
1269
0
        let curr = self.state.load(SeqCst);
1270
0
        if !decode_state(curr).is_open {
1271
0
            return;
1272
0
        }
1273
1274
0
        self.state.fetch_and(!OPEN_MASK, SeqCst);
1275
0
    }
1276
}
1277
1278
unsafe impl<T: Send> Send for UnboundedInner<T> {}
1279
unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1280
1281
unsafe impl<T: Send> Send for BoundedInner<T> {}
1282
unsafe impl<T: Send> Sync for BoundedInner<T> {}
1283
1284
impl State {
1285
1.17k
    fn is_closed(&self) -> bool {
1286
1.17k
        !self.is_open && self.num_messages == 0
1287
1.17k
    }
1288
}
1289
1290
/*
1291
 *
1292
 * ===== Helpers =====
1293
 *
1294
 */
1295
1296
3.51k
fn decode_state(num: usize) -> State {
1297
3.51k
    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1298
3.51k
}
1299
1300
778
fn encode_state(state: &State) -> usize {
1301
778
    let mut num = state.num_messages;
1302
1303
778
    if state.is_open {
1304
778
        num |= OPEN_MASK;
1305
778
    }
1306
1307
778
    num
1308
778
}