Coverage Report

Created: 2025-07-23 06:18

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-channel-0.3.21/src/oneshot.rs
Line
Count
Source (jump to first uncovered line)
1
//! A channel for sending a single message between asynchronous tasks.
2
//!
3
//! This is a single-producer, single-consumer channel.
4
5
use alloc::sync::Arc;
6
use core::fmt;
7
use core::pin::Pin;
8
use core::sync::atomic::AtomicBool;
9
use core::sync::atomic::Ordering::SeqCst;
10
use futures_core::future::{FusedFuture, Future};
11
use futures_core::task::{Context, Poll, Waker};
12
13
use crate::lock::Lock;
14
15
/// A future for a value that will be provided by another asynchronous task.
16
///
17
/// This is created by the [`channel`](channel) function.
18
#[must_use = "futures do nothing unless you `.await` or poll them"]
19
pub struct Receiver<T> {
20
    inner: Arc<Inner<T>>,
21
}
22
23
/// A means of transmitting a single value to another task.
24
///
25
/// This is created by the [`channel`](channel) function.
26
pub struct Sender<T> {
27
    inner: Arc<Inner<T>>,
28
}
29
30
// The channels do not ever project Pin to the inner T
31
impl<T> Unpin for Receiver<T> {}
32
impl<T> Unpin for Sender<T> {}
33
34
/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
35
/// the internal synchronization between the two for send/recv operations.
36
struct Inner<T> {
37
    /// Indicates whether this oneshot is complete yet. This is filled in both
38
    /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
39
    /// appropriately.
40
    ///
41
    /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
42
    /// unlocked and ready to be inspected.
43
    ///
44
    /// For `Sender` if this is `true` then the oneshot has gone away and it
45
    /// can return ready from `poll_canceled`.
46
    complete: AtomicBool,
47
48
    /// The actual data being transferred as part of this `Receiver`. This is
49
    /// filled in by `Sender::complete` and read by `Receiver::poll`.
50
    ///
51
    /// Note that this is protected by `Lock`, but it is in theory safe to
52
    /// replace with an `UnsafeCell` as it's actually protected by `complete`
53
    /// above. I wouldn't recommend doing this, however, unless someone is
54
    /// supremely confident in the various atomic orderings here and there.
55
    data: Lock<Option<T>>,
56
57
    /// Field to store the task which is blocked in `Receiver::poll`.
58
    ///
59
    /// This is filled in when a oneshot is polled but not ready yet. Note that
60
    /// the `Lock` here, unlike in `data` above, is important to resolve races.
61
    /// Both the `Receiver` and the `Sender` halves understand that if they
62
    /// can't acquire the lock then some important interference is happening.
63
    rx_task: Lock<Option<Waker>>,
64
65
    /// Like `rx_task` above, except for the task blocked in
66
    /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
67
    tx_task: Lock<Option<Waker>>,
68
}
69
70
/// Creates a new one-shot channel for sending a single value across asynchronous tasks.
71
///
72
/// The channel works for a spsc (single-producer, single-consumer) scheme.
73
///
74
/// This function is similar to Rust's channel constructor found in the standard
75
/// library. Two halves are returned, the first of which is a `Sender` handle,
76
/// used to signal the end of a computation and provide its value. The second
77
/// half is a `Receiver` which implements the `Future` trait, resolving to the
78
/// value that was given to the `Sender` handle.
79
///
80
/// Each half can be separately owned and sent across tasks.
81
///
82
/// # Examples
83
///
84
/// ```
85
/// use futures::channel::oneshot;
86
/// use std::{thread, time::Duration};
87
///
88
/// let (sender, receiver) = oneshot::channel::<i32>();
89
///
90
/// thread::spawn(|| {
91
///     println!("THREAD: sleeping zzz...");
92
///     thread::sleep(Duration::from_millis(1000));
93
///     println!("THREAD: i'm awake! sending.");
94
///     sender.send(3).unwrap();
95
/// });
96
///
97
/// println!("MAIN: doing some useful stuff");
98
///
99
/// futures::executor::block_on(async {
100
///     println!("MAIN: waiting for msg...");
101
///     println!("MAIN: got: {:?}", receiver.await)
102
/// });
103
/// ```
104
524
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
105
524
    let inner = Arc::new(Inner::new());
106
524
    let receiver = Receiver { inner: inner.clone() };
107
524
    let sender = Sender { inner };
108
524
    (sender, receiver)
109
524
}
Unexecuted instantiation: futures_channel::oneshot::channel::<core::option::Option<devices::virtio::queue::Queue>>
futures_channel::oneshot::channel::<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>
Line
Count
Source
104
262
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
105
262
    let inner = Arc::new(Inner::new());
106
262
    let receiver = Receiver { inner: inner.clone() };
107
262
    let sender = Sender { inner };
108
262
    (sender, receiver)
109
262
}
futures_channel::oneshot::channel::<()>
Line
Count
Source
104
262
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
105
262
    let inner = Arc::new(Inner::new());
106
262
    let receiver = Receiver { inner: inner.clone() };
107
262
    let sender = Sender { inner };
108
262
    (sender, receiver)
109
262
}
Unexecuted instantiation: futures_channel::oneshot::channel::<core::result::Result<usize, disk::Error>>
Unexecuted instantiation: futures_channel::oneshot::channel::<core::result::Result<(), disk::Error>>
Unexecuted instantiation: futures_channel::oneshot::channel::<_>
110
111
impl<T> Inner<T> {
112
524
    fn new() -> Self {
113
524
        Self {
114
524
            complete: AtomicBool::new(false),
115
524
            data: Lock::new(None),
116
524
            rx_task: Lock::new(None),
117
524
            tx_task: Lock::new(None),
118
524
        }
119
524
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::new
<futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::new
Line
Count
Source
112
262
    fn new() -> Self {
113
262
        Self {
114
262
            complete: AtomicBool::new(false),
115
262
            data: Lock::new(None),
116
262
            rx_task: Lock::new(None),
117
262
            tx_task: Lock::new(None),
118
262
        }
119
262
    }
<futures_channel::oneshot::Inner<()>>::new
Line
Count
Source
112
262
    fn new() -> Self {
113
262
        Self {
114
262
            complete: AtomicBool::new(false),
115
262
            data: Lock::new(None),
116
262
            rx_task: Lock::new(None),
117
262
            tx_task: Lock::new(None),
118
262
        }
119
262
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::new
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::new
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::new
120
121
0
    fn send(&self, t: T) -> Result<(), T> {
122
0
        if self.complete.load(SeqCst) {
123
0
            return Err(t);
124
0
        }
125
126
        // Note that this lock acquisition may fail if the receiver
127
        // is closed and sets the `complete` flag to `true`, whereupon
128
        // the receiver may call `poll()`.
129
0
        if let Some(mut slot) = self.data.try_lock() {
130
0
            assert!(slot.is_none());
131
0
            *slot = Some(t);
132
0
            drop(slot);
133
0
134
0
            // If the receiver called `close()` between the check at the
135
0
            // start of the function, and the lock being released, then
136
0
            // the receiver may not be around to receive it, so try to
137
0
            // pull it back out.
138
0
            if self.complete.load(SeqCst) {
139
                // If lock acquisition fails, then receiver is actually
140
                // receiving it, so we're good.
141
0
                if let Some(mut slot) = self.data.try_lock() {
142
0
                    if let Some(t) = slot.take() {
143
0
                        return Err(t);
144
0
                    }
145
0
                }
146
0
            }
147
0
            Ok(())
148
        } else {
149
            // Must have been closed
150
0
            Err(t)
151
        }
152
0
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Inner<()>>::send
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::send
153
154
137
    fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
155
137
        // Fast path up first, just read the flag and see if our other half is
156
137
        // gone. This flag is set both in our destructor and the oneshot
157
137
        // destructor, but our destructor hasn't run yet so if it's set then the
158
137
        // oneshot is gone.
159
137
        if self.complete.load(SeqCst) {
160
0
            return Poll::Ready(());
161
137
        }
162
137
163
137
        // If our other half is not gone then we need to park our current task
164
137
        // and move it into the `tx_task` slot to get notified when it's
165
137
        // actually gone.
166
137
        //
167
137
        // If `try_lock` fails, then the `Receiver` is in the process of using
168
137
        // it, so we can deduce that it's now in the process of going away and
169
137
        // hence we're canceled. If it succeeds then we just store our handle.
170
137
        //
171
137
        // Crucially we then check `complete` *again* before we return.
172
137
        // While we were storing our handle inside `tx_task` the
173
137
        // `Receiver` may have been dropped. The first thing it does is set the
174
137
        // flag, and if it fails to acquire the lock it assumes that we'll see
175
137
        // the flag later on. So... we then try to see the flag later on!
176
137
        let handle = cx.waker().clone();
177
137
        match self.tx_task.try_lock() {
178
137
            Some(mut p) => *p = Some(handle),
179
0
            None => return Poll::Ready(()),
180
        }
181
137
        if self.complete.load(SeqCst) {
182
0
            Poll::Ready(())
183
        } else {
184
137
            Poll::Pending
185
        }
186
137
    }
<futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::poll_canceled
Line
Count
Source
154
137
    fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
155
137
        // Fast path up first, just read the flag and see if our other half is
156
137
        // gone. This flag is set both in our destructor and the oneshot
157
137
        // destructor, but our destructor hasn't run yet so if it's set then the
158
137
        // oneshot is gone.
159
137
        if self.complete.load(SeqCst) {
160
0
            return Poll::Ready(());
161
137
        }
162
137
163
137
        // If our other half is not gone then we need to park our current task
164
137
        // and move it into the `tx_task` slot to get notified when it's
165
137
        // actually gone.
166
137
        //
167
137
        // If `try_lock` fails, then the `Receiver` is in the process of using
168
137
        // it, so we can deduce that it's now in the process of going away and
169
137
        // hence we're canceled. If it succeeds then we just store our handle.
170
137
        //
171
137
        // Crucially we then check `complete` *again* before we return.
172
137
        // While we were storing our handle inside `tx_task` the
173
137
        // `Receiver` may have been dropped. The first thing it does is set the
174
137
        // flag, and if it fails to acquire the lock it assumes that we'll see
175
137
        // the flag later on. So... we then try to see the flag later on!
176
137
        let handle = cx.waker().clone();
177
137
        match self.tx_task.try_lock() {
178
137
            Some(mut p) => *p = Some(handle),
179
0
            None => return Poll::Ready(()),
180
        }
181
137
        if self.complete.load(SeqCst) {
182
0
            Poll::Ready(())
183
        } else {
184
137
            Poll::Pending
185
        }
186
137
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::poll_canceled
187
188
0
    fn is_canceled(&self) -> bool {
189
0
        self.complete.load(SeqCst)
190
0
    }
191
192
524
    fn drop_tx(&self) {
193
524
        // Flag that we're a completed `Sender` and try to wake up a receiver.
194
524
        // Whether or not we actually stored any data will get picked up and
195
524
        // translated to either an item or cancellation.
196
524
        //
197
524
        // Note that if we fail to acquire the `rx_task` lock then that means
198
524
        // we're in one of two situations:
199
524
        //
200
524
        // 1. The receiver is trying to block in `poll`
201
524
        // 2. The receiver is being dropped
202
524
        //
203
524
        // In the first case it'll check the `complete` flag after it's done
204
524
        // blocking to see if it succeeded. In the latter case we don't need to
205
524
        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
206
524
        // case of `try_lock` and bail out.
207
524
        //
208
524
        // The first case crucially depends on `Lock` using `SeqCst` ordering
209
524
        // under the hood. If it instead used `Release` / `Acquire` ordering,
210
524
        // then it would not necessarily synchronize with `inner.complete`
211
524
        // and deadlock might be possible, as was observed in
212
524
        // https://github.com/rust-lang/futures-rs/pull/219.
213
524
        self.complete.store(true, SeqCst);
214
215
524
        if let Some(mut slot) = self.rx_task.try_lock() {
216
524
            if let Some(task) = slot.take() {
217
137
                drop(slot);
218
137
                task.wake();
219
387
            }
220
0
        }
221
222
        // If we registered a task for cancel notification drop it to reduce
223
        // spurious wakeups
224
524
        if let Some(mut slot) = self.tx_task.try_lock() {
225
524
            drop(slot.take());
226
524
        }
227
524
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::drop_tx
<futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::drop_tx
Line
Count
Source
192
262
    fn drop_tx(&self) {
193
262
        // Flag that we're a completed `Sender` and try to wake up a receiver.
194
262
        // Whether or not we actually stored any data will get picked up and
195
262
        // translated to either an item or cancellation.
196
262
        //
197
262
        // Note that if we fail to acquire the `rx_task` lock then that means
198
262
        // we're in one of two situations:
199
262
        //
200
262
        // 1. The receiver is trying to block in `poll`
201
262
        // 2. The receiver is being dropped
202
262
        //
203
262
        // In the first case it'll check the `complete` flag after it's done
204
262
        // blocking to see if it succeeded. In the latter case we don't need to
205
262
        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
206
262
        // case of `try_lock` and bail out.
207
262
        //
208
262
        // The first case crucially depends on `Lock` using `SeqCst` ordering
209
262
        // under the hood. If it instead used `Release` / `Acquire` ordering,
210
262
        // then it would not necessarily synchronize with `inner.complete`
211
262
        // and deadlock might be possible, as was observed in
212
262
        // https://github.com/rust-lang/futures-rs/pull/219.
213
262
        self.complete.store(true, SeqCst);
214
215
262
        if let Some(mut slot) = self.rx_task.try_lock() {
216
262
            if let Some(task) = slot.take() {
217
0
                drop(slot);
218
0
                task.wake();
219
262
            }
220
0
        }
221
222
        // If we registered a task for cancel notification drop it to reduce
223
        // spurious wakeups
224
262
        if let Some(mut slot) = self.tx_task.try_lock() {
225
262
            drop(slot.take());
226
262
        }
227
262
    }
<futures_channel::oneshot::Inner<()>>::drop_tx
Line
Count
Source
192
262
    fn drop_tx(&self) {
193
262
        // Flag that we're a completed `Sender` and try to wake up a receiver.
194
262
        // Whether or not we actually stored any data will get picked up and
195
262
        // translated to either an item or cancellation.
196
262
        //
197
262
        // Note that if we fail to acquire the `rx_task` lock then that means
198
262
        // we're in one of two situations:
199
262
        //
200
262
        // 1. The receiver is trying to block in `poll`
201
262
        // 2. The receiver is being dropped
202
262
        //
203
262
        // In the first case it'll check the `complete` flag after it's done
204
262
        // blocking to see if it succeeded. In the latter case we don't need to
205
262
        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
206
262
        // case of `try_lock` and bail out.
207
262
        //
208
262
        // The first case crucially depends on `Lock` using `SeqCst` ordering
209
262
        // under the hood. If it instead used `Release` / `Acquire` ordering,
210
262
        // then it would not necessarily synchronize with `inner.complete`
211
262
        // and deadlock might be possible, as was observed in
212
262
        // https://github.com/rust-lang/futures-rs/pull/219.
213
262
        self.complete.store(true, SeqCst);
214
215
262
        if let Some(mut slot) = self.rx_task.try_lock() {
216
262
            if let Some(task) = slot.take() {
217
137
                drop(slot);
218
137
                task.wake();
219
137
            }
220
0
        }
221
222
        // If we registered a task for cancel notification drop it to reduce
223
        // spurious wakeups
224
262
        if let Some(mut slot) = self.tx_task.try_lock() {
225
262
            drop(slot.take());
226
262
        }
227
262
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::drop_tx
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::drop_tx
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_tx
228
229
0
    fn close_rx(&self) {
230
0
        // Flag our completion and then attempt to wake up the sender if it's
231
0
        // blocked. See comments in `drop` below for more info
232
0
        self.complete.store(true, SeqCst);
233
0
        if let Some(mut handle) = self.tx_task.try_lock() {
234
0
            if let Some(task) = handle.take() {
235
0
                drop(handle);
236
0
                task.wake()
237
0
            }
238
0
        }
239
0
    }
240
241
0
    fn try_recv(&self) -> Result<Option<T>, Canceled> {
242
0
        // If we're complete, either `::close_rx` or `::drop_tx` was called.
243
0
        // We can assume a successful send if data is present.
244
0
        if self.complete.load(SeqCst) {
245
0
            if let Some(mut slot) = self.data.try_lock() {
246
0
                if let Some(data) = slot.take() {
247
0
                    return Ok(Some(data));
248
0
                }
249
0
            }
250
0
            Err(Canceled)
251
        } else {
252
0
            Ok(None)
253
        }
254
0
    }
255
256
117k
    fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
257
        // Check to see if some data has arrived. If it hasn't then we need to
258
        // block our task.
259
        //
260
        // Note that the acquisition of the `rx_task` lock might fail below, but
261
        // the only situation where this can happen is during `Sender::drop`
262
        // when we are indeed completed already. If that's happening then we
263
        // know we're completed so keep going.
264
117k
        let done = if self.complete.load(SeqCst) {
265
0
            true
266
        } else {
267
117k
            let task = cx.waker().clone();
268
117k
            match self.rx_task.try_lock() {
269
117k
                Some(mut slot) => {
270
117k
                    *slot = Some(task);
271
117k
                    false
272
                }
273
0
                None => true,
274
            }
275
        };
276
277
        // If we're `done` via one of the paths above, then look at the data and
278
        // figure out what the answer is. If, however, we stored `rx_task`
279
        // successfully above we need to check again if we're completed in case
280
        // a message was sent while `rx_task` was locked and couldn't notify us
281
        // otherwise.
282
        //
283
        // If we're not done, and we're not complete, though, then we've
284
        // successfully blocked our task and we return `Pending`.
285
117k
        if done || self.complete.load(SeqCst) {
286
            // If taking the lock fails, the sender will realise that the we're
287
            // `done` when it checks the `complete` flag on the way out, and
288
            // will treat the send as a failure.
289
0
            if let Some(mut slot) = self.data.try_lock() {
290
0
                if let Some(data) = slot.take() {
291
0
                    return Poll::Ready(Ok(data));
292
0
                }
293
0
            }
294
0
            Poll::Ready(Err(Canceled))
295
        } else {
296
117k
            Poll::Pending
297
        }
298
117k
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::recv
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::recv
<futures_channel::oneshot::Inner<()>>::recv
Line
Count
Source
256
117k
    fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
257
        // Check to see if some data has arrived. If it hasn't then we need to
258
        // block our task.
259
        //
260
        // Note that the acquisition of the `rx_task` lock might fail below, but
261
        // the only situation where this can happen is during `Sender::drop`
262
        // when we are indeed completed already. If that's happening then we
263
        // know we're completed so keep going.
264
117k
        let done = if self.complete.load(SeqCst) {
265
0
            true
266
        } else {
267
117k
            let task = cx.waker().clone();
268
117k
            match self.rx_task.try_lock() {
269
117k
                Some(mut slot) => {
270
117k
                    *slot = Some(task);
271
117k
                    false
272
                }
273
0
                None => true,
274
            }
275
        };
276
277
        // If we're `done` via one of the paths above, then look at the data and
278
        // figure out what the answer is. If, however, we stored `rx_task`
279
        // successfully above we need to check again if we're completed in case
280
        // a message was sent while `rx_task` was locked and couldn't notify us
281
        // otherwise.
282
        //
283
        // If we're not done, and we're not complete, though, then we've
284
        // successfully blocked our task and we return `Pending`.
285
117k
        if done || self.complete.load(SeqCst) {
286
            // If taking the lock fails, the sender will realise that the we're
287
            // `done` when it checks the `complete` flag on the way out, and
288
            // will treat the send as a failure.
289
0
            if let Some(mut slot) = self.data.try_lock() {
290
0
                if let Some(data) = slot.take() {
291
0
                    return Poll::Ready(Ok(data));
292
0
                }
293
0
            }
294
0
            Poll::Ready(Err(Canceled))
295
        } else {
296
117k
            Poll::Pending
297
        }
298
117k
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::recv
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::recv
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::recv
299
300
524
    fn drop_rx(&self) {
301
524
        // Indicate to the `Sender` that we're done, so any future calls to
302
524
        // `poll_canceled` are weeded out.
303
524
        self.complete.store(true, SeqCst);
304
305
        // If we've blocked a task then there's no need for it to stick around,
306
        // so we need to drop it. If this lock acquisition fails, though, then
307
        // it's just because our `Sender` is trying to take the task, so we
308
        // let them take care of that.
309
524
        if let Some(mut slot) = self.rx_task.try_lock() {
310
524
            let task = slot.take();
311
524
            drop(slot);
312
524
            drop(task);
313
524
        }
314
315
        // Finally, if our `Sender` wants to get notified of us going away, it
316
        // would have stored something in `tx_task`. Here we try to peel that
317
        // out and unpark it.
318
        //
319
        // Note that the `try_lock` here may fail, but only if the `Sender` is
320
        // in the process of filling in the task. If that happens then we
321
        // already flagged `complete` and they'll pick that up above.
322
524
        if let Some(mut handle) = self.tx_task.try_lock() {
323
524
            if let Some(task) = handle.take() {
324
137
                drop(handle);
325
137
                task.wake()
326
387
            }
327
0
        }
328
524
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::drop_rx
<futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::drop_rx
Line
Count
Source
300
262
    fn drop_rx(&self) {
301
262
        // Indicate to the `Sender` that we're done, so any future calls to
302
262
        // `poll_canceled` are weeded out.
303
262
        self.complete.store(true, SeqCst);
304
305
        // If we've blocked a task then there's no need for it to stick around,
306
        // so we need to drop it. If this lock acquisition fails, though, then
307
        // it's just because our `Sender` is trying to take the task, so we
308
        // let them take care of that.
309
262
        if let Some(mut slot) = self.rx_task.try_lock() {
310
262
            let task = slot.take();
311
262
            drop(slot);
312
262
            drop(task);
313
262
        }
314
315
        // Finally, if our `Sender` wants to get notified of us going away, it
316
        // would have stored something in `tx_task`. Here we try to peel that
317
        // out and unpark it.
318
        //
319
        // Note that the `try_lock` here may fail, but only if the `Sender` is
320
        // in the process of filling in the task. If that happens then we
321
        // already flagged `complete` and they'll pick that up above.
322
262
        if let Some(mut handle) = self.tx_task.try_lock() {
323
262
            if let Some(task) = handle.take() {
324
137
                drop(handle);
325
137
                task.wake()
326
125
            }
327
0
        }
328
262
    }
<futures_channel::oneshot::Inner<()>>::drop_rx
Line
Count
Source
300
262
    fn drop_rx(&self) {
301
262
        // Indicate to the `Sender` that we're done, so any future calls to
302
262
        // `poll_canceled` are weeded out.
303
262
        self.complete.store(true, SeqCst);
304
305
        // If we've blocked a task then there's no need for it to stick around,
306
        // so we need to drop it. If this lock acquisition fails, though, then
307
        // it's just because our `Sender` is trying to take the task, so we
308
        // let them take care of that.
309
262
        if let Some(mut slot) = self.rx_task.try_lock() {
310
262
            let task = slot.take();
311
262
            drop(slot);
312
262
            drop(task);
313
262
        }
314
315
        // Finally, if our `Sender` wants to get notified of us going away, it
316
        // would have stored something in `tx_task`. Here we try to peel that
317
        // out and unpark it.
318
        //
319
        // Note that the `try_lock` here may fail, but only if the `Sender` is
320
        // in the process of filling in the task. If that happens then we
321
        // already flagged `complete` and they'll pick that up above.
322
262
        if let Some(mut handle) = self.tx_task.try_lock() {
323
262
            if let Some(task) = handle.take() {
324
0
                drop(handle);
325
0
                task.wake()
326
262
            }
327
0
        }
328
262
    }
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::drop_rx
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::drop_rx
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_rx
329
}
330
331
impl<T> Sender<T> {
332
    /// Completes this oneshot with a successful result.
333
    ///
334
    /// This function will consume `self` and indicate to the other end, the
335
    /// [`Receiver`](Receiver), that the value provided is the result of the
336
    /// computation this represents.
337
    ///
338
    /// If the value is successfully enqueued for the remote end to receive,
339
    /// then `Ok(())` is returned. If the receiving end was dropped before
340
    /// this function was called, however, then `Err(t)` is returned.
341
0
    pub fn send(self, t: T) -> Result<(), T> {
342
0
        self.inner.send(t)
343
0
    }
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::option::Option<devices::virtio::queue::Queue>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Sender<()>>::send
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<usize, disk::Error>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<(), disk::Error>>>::send
Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::send
344
345
    /// Polls this `Sender` half to detect whether its associated
346
    /// [`Receiver`](Receiver) has been dropped.
347
    ///
348
    /// # Return values
349
    ///
350
    /// If `Ready(())` is returned then the associated `Receiver` has been
351
    /// dropped, which means any work required for sending should be canceled.
352
    ///
353
    /// If `Pending` is returned then the associated `Receiver` is still
354
    /// alive and may be able to receive a message if sent. The current task,
355
    /// however, is scheduled to receive a notification if the corresponding
356
    /// `Receiver` goes away.
357
137
    pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
358
137
        self.inner.poll_canceled(cx)
359
137
    }
<futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::poll_canceled
Line
Count
Source
357
137
    pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
358
137
        self.inner.poll_canceled(cx)
359
137
    }
Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::poll_canceled
360
361
    /// Creates a future that resolves when this `Sender`'s corresponding
362
    /// [`Receiver`](Receiver) half has hung up.
363
    ///
364
    /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
365
    /// to expose a [`Future`](core::future::Future).
366
0
    pub fn cancellation(&mut self) -> Cancellation<'_, T> {
367
0
        Cancellation { inner: self }
368
0
    }
369
370
    /// Tests to see whether this `Sender`'s corresponding `Receiver`
371
    /// has been dropped.
372
    ///
373
    /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
374
    /// enqueue a task for wakeup upon cancellation, but merely reports the
375
    /// current state, which may be subject to concurrent modification.
376
0
    pub fn is_canceled(&self) -> bool {
377
0
        self.inner.is_canceled()
378
0
    }
379
380
    /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
381
    /// they were created by the same call to `channel`.
382
0
    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
383
0
        Arc::ptr_eq(&self.inner, &receiver.inner)
384
0
    }
385
}
386
387
impl<T> Drop for Sender<T> {
388
524
    fn drop(&mut self) {
389
524
        self.inner.drop_tx()
390
524
    }
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::option::Option<devices::virtio::queue::Queue>> as core::ops::drop::Drop>::drop
<futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::ops::drop::Drop>::drop
Line
Count
Source
388
262
    fn drop(&mut self) {
389
262
        self.inner.drop_tx()
390
262
    }
<futures_channel::oneshot::Sender<()> as core::ops::drop::Drop>::drop
Line
Count
Source
388
262
    fn drop(&mut self) {
389
262
        self.inner.drop_tx()
390
262
    }
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<usize, disk::Error>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<(), disk::Error>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <futures_channel::oneshot::Sender<_> as core::ops::drop::Drop>::drop
391
}
392
393
impl<T: fmt::Debug> fmt::Debug for Sender<T> {
394
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395
0
        f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
396
0
    }
397
}
398
399
/// A future that resolves when the receiving end of a channel has hung up.
400
///
401
/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
402
#[must_use = "futures do nothing unless you `.await` or poll them"]
403
#[derive(Debug)]
404
pub struct Cancellation<'a, T> {
405
    inner: &'a mut Sender<T>,
406
}
407
408
impl<T> Future for Cancellation<'_, T> {
409
    type Output = ();
410
411
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
412
0
        self.inner.poll_canceled(cx)
413
0
    }
414
}
415
416
/// Error returned from a [`Receiver`](Receiver) when the corresponding
417
/// [`Sender`](Sender) is dropped.
418
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
419
pub struct Canceled;
420
421
impl fmt::Display for Canceled {
422
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423
0
        write!(f, "oneshot canceled")
424
0
    }
425
}
426
427
#[cfg(feature = "std")]
428
impl std::error::Error for Canceled {}
429
430
impl<T> Receiver<T> {
431
    /// Gracefully close this receiver, preventing any subsequent attempts to
432
    /// send to it.
433
    ///
434
    /// Any `send` operation which happens after this method returns is
435
    /// guaranteed to fail. After calling this method, you can use
436
    /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
437
    /// message had previously been sent.
438
0
    pub fn close(&mut self) {
439
0
        self.inner.close_rx()
440
0
    }
441
442
    /// Attempts to receive a message outside of the context of a task.
443
    ///
444
    /// Does not schedule a task wakeup or have any other side effects.
445
    ///
446
    /// A return value of `None` must be considered immediately stale (out of
447
    /// date) unless [`close`](Receiver::close) has been called first.
448
    ///
449
    /// Returns an error if the sender was dropped.
450
0
    pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
451
0
        self.inner.try_recv()
452
0
    }
453
}
454
455
impl<T> Future for Receiver<T> {
456
    type Output = Result<T, Canceled>;
457
458
117k
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
459
117k
        self.inner.recv(cx)
460
117k
    }
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::option::Option<devices::virtio::queue::Queue>> as core::future::future::Future>::poll
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::future::future::Future>::poll
<futures_channel::oneshot::Receiver<()> as core::future::future::Future>::poll
Line
Count
Source
458
117k
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
459
117k
        self.inner.recv(cx)
460
117k
    }
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<usize, disk::Error>> as core::future::future::Future>::poll
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<(), disk::Error>> as core::future::future::Future>::poll
Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::future::future::Future>::poll
461
}
462
463
impl<T> FusedFuture for Receiver<T> {
464
117k
    fn is_terminated(&self) -> bool {
465
117k
        if self.inner.complete.load(SeqCst) {
466
0
            if let Some(slot) = self.inner.data.try_lock() {
467
0
                if slot.is_some() {
468
0
                    return false;
469
0
                }
470
0
            }
471
0
            true
472
        } else {
473
117k
            false
474
        }
475
117k
    }
<futures_channel::oneshot::Receiver<()> as futures_core::future::FusedFuture>::is_terminated
Line
Count
Source
464
117k
    fn is_terminated(&self) -> bool {
465
117k
        if self.inner.complete.load(SeqCst) {
466
0
            if let Some(slot) = self.inner.data.try_lock() {
467
0
                if slot.is_some() {
468
0
                    return false;
469
0
                }
470
0
            }
471
0
            true
472
        } else {
473
117k
            false
474
        }
475
117k
    }
Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as futures_core::future::FusedFuture>::is_terminated
476
}
477
478
impl<T> Drop for Receiver<T> {
479
524
    fn drop(&mut self) {
480
524
        self.inner.drop_rx()
481
524
    }
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::option::Option<devices::virtio::queue::Queue>> as core::ops::drop::Drop>::drop
<futures_channel::oneshot::Receiver<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::ops::drop::Drop>::drop
Line
Count
Source
479
262
    fn drop(&mut self) {
480
262
        self.inner.drop_rx()
481
262
    }
<futures_channel::oneshot::Receiver<()> as core::ops::drop::Drop>::drop
Line
Count
Source
479
262
    fn drop(&mut self) {
480
262
        self.inner.drop_rx()
481
262
    }
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<usize, disk::Error>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<(), disk::Error>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::ops::drop::Drop>::drop
482
}
483
484
impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
485
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486
0
        f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
487
0
    }
488
}