/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-channel-0.3.21/src/oneshot.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! A channel for sending a single message between asynchronous tasks. |
2 | | //! |
3 | | //! This is a single-producer, single-consumer channel. |
4 | | |
5 | | use alloc::sync::Arc; |
6 | | use core::fmt; |
7 | | use core::pin::Pin; |
8 | | use core::sync::atomic::AtomicBool; |
9 | | use core::sync::atomic::Ordering::SeqCst; |
10 | | use futures_core::future::{FusedFuture, Future}; |
11 | | use futures_core::task::{Context, Poll, Waker}; |
12 | | |
13 | | use crate::lock::Lock; |
14 | | |
15 | | /// A future for a value that will be provided by another asynchronous task. |
16 | | /// |
17 | | /// This is created by the [`channel`](channel) function. |
18 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
19 | | pub struct Receiver<T> { |
20 | | inner: Arc<Inner<T>>, |
21 | | } |
22 | | |
23 | | /// A means of transmitting a single value to another task. |
24 | | /// |
25 | | /// This is created by the [`channel`](channel) function. |
26 | | pub struct Sender<T> { |
27 | | inner: Arc<Inner<T>>, |
28 | | } |
29 | | |
30 | | // The channels do not ever project Pin to the inner T |
31 | | impl<T> Unpin for Receiver<T> {} |
32 | | impl<T> Unpin for Sender<T> {} |
33 | | |
34 | | /// Internal state of the `Receiver`/`Sender` pair above. This is all used as |
35 | | /// the internal synchronization between the two for send/recv operations. |
36 | | struct Inner<T> { |
37 | | /// Indicates whether this oneshot is complete yet. This is filled in both |
38 | | /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it |
39 | | /// appropriately. |
40 | | /// |
41 | | /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is |
42 | | /// unlocked and ready to be inspected. |
43 | | /// |
44 | | /// For `Sender` if this is `true` then the oneshot has gone away and it |
45 | | /// can return ready from `poll_canceled`. |
46 | | complete: AtomicBool, |
47 | | |
48 | | /// The actual data being transferred as part of this `Receiver`. This is |
49 | | /// filled in by `Sender::complete` and read by `Receiver::poll`. |
50 | | /// |
51 | | /// Note that this is protected by `Lock`, but it is in theory safe to |
52 | | /// replace with an `UnsafeCell` as it's actually protected by `complete` |
53 | | /// above. I wouldn't recommend doing this, however, unless someone is |
54 | | /// supremely confident in the various atomic orderings here and there. |
55 | | data: Lock<Option<T>>, |
56 | | |
57 | | /// Field to store the task which is blocked in `Receiver::poll`. |
58 | | /// |
59 | | /// This is filled in when a oneshot is polled but not ready yet. Note that |
60 | | /// the `Lock` here, unlike in `data` above, is important to resolve races. |
61 | | /// Both the `Receiver` and the `Sender` halves understand that if they |
62 | | /// can't acquire the lock then some important interference is happening. |
63 | | rx_task: Lock<Option<Waker>>, |
64 | | |
65 | | /// Like `rx_task` above, except for the task blocked in |
66 | | /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`. |
67 | | tx_task: Lock<Option<Waker>>, |
68 | | } |
69 | | |
70 | | /// Creates a new one-shot channel for sending a single value across asynchronous tasks. |
71 | | /// |
72 | | /// The channel works for a spsc (single-producer, single-consumer) scheme. |
73 | | /// |
74 | | /// This function is similar to Rust's channel constructor found in the standard |
75 | | /// library. Two halves are returned, the first of which is a `Sender` handle, |
76 | | /// used to signal the end of a computation and provide its value. The second |
77 | | /// half is a `Receiver` which implements the `Future` trait, resolving to the |
78 | | /// value that was given to the `Sender` handle. |
79 | | /// |
80 | | /// Each half can be separately owned and sent across tasks. |
81 | | /// |
82 | | /// # Examples |
83 | | /// |
84 | | /// ``` |
85 | | /// use futures::channel::oneshot; |
86 | | /// use std::{thread, time::Duration}; |
87 | | /// |
88 | | /// let (sender, receiver) = oneshot::channel::<i32>(); |
89 | | /// |
90 | | /// thread::spawn(|| { |
91 | | /// println!("THREAD: sleeping zzz..."); |
92 | | /// thread::sleep(Duration::from_millis(1000)); |
93 | | /// println!("THREAD: i'm awake! sending."); |
94 | | /// sender.send(3).unwrap(); |
95 | | /// }); |
96 | | /// |
97 | | /// println!("MAIN: doing some useful stuff"); |
98 | | /// |
99 | | /// futures::executor::block_on(async { |
100 | | /// println!("MAIN: waiting for msg..."); |
101 | | /// println!("MAIN: got: {:?}", receiver.await) |
102 | | /// }); |
103 | | /// ``` |
104 | 524 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
105 | 524 | let inner = Arc::new(Inner::new()); |
106 | 524 | let receiver = Receiver { inner: inner.clone() }; |
107 | 524 | let sender = Sender { inner }; |
108 | 524 | (sender, receiver) |
109 | 524 | } Unexecuted instantiation: futures_channel::oneshot::channel::<core::option::Option<devices::virtio::queue::Queue>> futures_channel::oneshot::channel::<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> Line | Count | Source | 104 | 262 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { | 105 | 262 | let inner = Arc::new(Inner::new()); | 106 | 262 | let receiver = Receiver { inner: inner.clone() }; | 107 | 262 | let sender = Sender { inner }; | 108 | 262 | (sender, receiver) | 109 | 262 | } |
futures_channel::oneshot::channel::<()> Line | Count | Source | 104 | 262 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { | 105 | 262 | let inner = Arc::new(Inner::new()); | 106 | 262 | let receiver = Receiver { inner: inner.clone() }; | 107 | 262 | let sender = Sender { inner }; | 108 | 262 | (sender, receiver) | 109 | 262 | } |
Unexecuted instantiation: futures_channel::oneshot::channel::<core::result::Result<usize, disk::Error>> Unexecuted instantiation: futures_channel::oneshot::channel::<core::result::Result<(), disk::Error>> Unexecuted instantiation: futures_channel::oneshot::channel::<_> |
110 | | |
111 | | impl<T> Inner<T> { |
112 | 524 | fn new() -> Self { |
113 | 524 | Self { |
114 | 524 | complete: AtomicBool::new(false), |
115 | 524 | data: Lock::new(None), |
116 | 524 | rx_task: Lock::new(None), |
117 | 524 | tx_task: Lock::new(None), |
118 | 524 | } |
119 | 524 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::new <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::new Line | Count | Source | 112 | 262 | fn new() -> Self { | 113 | 262 | Self { | 114 | 262 | complete: AtomicBool::new(false), | 115 | 262 | data: Lock::new(None), | 116 | 262 | rx_task: Lock::new(None), | 117 | 262 | tx_task: Lock::new(None), | 118 | 262 | } | 119 | 262 | } |
<futures_channel::oneshot::Inner<()>>::new Line | Count | Source | 112 | 262 | fn new() -> Self { | 113 | 262 | Self { | 114 | 262 | complete: AtomicBool::new(false), | 115 | 262 | data: Lock::new(None), | 116 | 262 | rx_task: Lock::new(None), | 117 | 262 | tx_task: Lock::new(None), | 118 | 262 | } | 119 | 262 | } |
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::new |
120 | | |
121 | 0 | fn send(&self, t: T) -> Result<(), T> { |
122 | 0 | if self.complete.load(SeqCst) { |
123 | 0 | return Err(t); |
124 | 0 | } |
125 | | |
126 | | // Note that this lock acquisition may fail if the receiver |
127 | | // is closed and sets the `complete` flag to `true`, whereupon |
128 | | // the receiver may call `poll()`. |
129 | 0 | if let Some(mut slot) = self.data.try_lock() { |
130 | 0 | assert!(slot.is_none()); |
131 | 0 | *slot = Some(t); |
132 | 0 | drop(slot); |
133 | 0 |
|
134 | 0 | // If the receiver called `close()` between the check at the |
135 | 0 | // start of the function, and the lock being released, then |
136 | 0 | // the receiver may not be around to receive it, so try to |
137 | 0 | // pull it back out. |
138 | 0 | if self.complete.load(SeqCst) { |
139 | | // If lock acquisition fails, then receiver is actually |
140 | | // receiving it, so we're good. |
141 | 0 | if let Some(mut slot) = self.data.try_lock() { |
142 | 0 | if let Some(t) = slot.take() { |
143 | 0 | return Err(t); |
144 | 0 | } |
145 | 0 | } |
146 | 0 | } |
147 | 0 | Ok(()) |
148 | | } else { |
149 | | // Must have been closed |
150 | 0 | Err(t) |
151 | | } |
152 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<()>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::send |
153 | | |
154 | 137 | fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { |
155 | 137 | // Fast path up first, just read the flag and see if our other half is |
156 | 137 | // gone. This flag is set both in our destructor and the oneshot |
157 | 137 | // destructor, but our destructor hasn't run yet so if it's set then the |
158 | 137 | // oneshot is gone. |
159 | 137 | if self.complete.load(SeqCst) { |
160 | 0 | return Poll::Ready(()); |
161 | 137 | } |
162 | 137 | |
163 | 137 | // If our other half is not gone then we need to park our current task |
164 | 137 | // and move it into the `tx_task` slot to get notified when it's |
165 | 137 | // actually gone. |
166 | 137 | // |
167 | 137 | // If `try_lock` fails, then the `Receiver` is in the process of using |
168 | 137 | // it, so we can deduce that it's now in the process of going away and |
169 | 137 | // hence we're canceled. If it succeeds then we just store our handle. |
170 | 137 | // |
171 | 137 | // Crucially we then check `complete` *again* before we return. |
172 | 137 | // While we were storing our handle inside `tx_task` the |
173 | 137 | // `Receiver` may have been dropped. The first thing it does is set the |
174 | 137 | // flag, and if it fails to acquire the lock it assumes that we'll see |
175 | 137 | // the flag later on. So... we then try to see the flag later on! |
176 | 137 | let handle = cx.waker().clone(); |
177 | 137 | match self.tx_task.try_lock() { |
178 | 137 | Some(mut p) => *p = Some(handle), |
179 | 0 | None => return Poll::Ready(()), |
180 | | } |
181 | 137 | if self.complete.load(SeqCst) { |
182 | 0 | Poll::Ready(()) |
183 | | } else { |
184 | 137 | Poll::Pending |
185 | | } |
186 | 137 | } <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::poll_canceled Line | Count | Source | 154 | 137 | fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { | 155 | 137 | // Fast path up first, just read the flag and see if our other half is | 156 | 137 | // gone. This flag is set both in our destructor and the oneshot | 157 | 137 | // destructor, but our destructor hasn't run yet so if it's set then the | 158 | 137 | // oneshot is gone. | 159 | 137 | if self.complete.load(SeqCst) { | 160 | 0 | return Poll::Ready(()); | 161 | 137 | } | 162 | 137 | | 163 | 137 | // If our other half is not gone then we need to park our current task | 164 | 137 | // and move it into the `tx_task` slot to get notified when it's | 165 | 137 | // actually gone. | 166 | 137 | // | 167 | 137 | // If `try_lock` fails, then the `Receiver` is in the process of using | 168 | 137 | // it, so we can deduce that it's now in the process of going away and | 169 | 137 | // hence we're canceled. If it succeeds then we just store our handle. | 170 | 137 | // | 171 | 137 | // Crucially we then check `complete` *again* before we return. | 172 | 137 | // While we were storing our handle inside `tx_task` the | 173 | 137 | // `Receiver` may have been dropped. The first thing it does is set the | 174 | 137 | // flag, and if it fails to acquire the lock it assumes that we'll see | 175 | 137 | // the flag later on. So... we then try to see the flag later on! | 176 | 137 | let handle = cx.waker().clone(); | 177 | 137 | match self.tx_task.try_lock() { | 178 | 137 | Some(mut p) => *p = Some(handle), | 179 | 0 | None => return Poll::Ready(()), | 180 | | } | 181 | 137 | if self.complete.load(SeqCst) { | 182 | 0 | Poll::Ready(()) | 183 | | } else { | 184 | 137 | Poll::Pending | 185 | | } | 186 | 137 | } |
Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::poll_canceled |
187 | | |
188 | 0 | fn is_canceled(&self) -> bool { |
189 | 0 | self.complete.load(SeqCst) |
190 | 0 | } |
191 | | |
192 | 524 | fn drop_tx(&self) { |
193 | 524 | // Flag that we're a completed `Sender` and try to wake up a receiver. |
194 | 524 | // Whether or not we actually stored any data will get picked up and |
195 | 524 | // translated to either an item or cancellation. |
196 | 524 | // |
197 | 524 | // Note that if we fail to acquire the `rx_task` lock then that means |
198 | 524 | // we're in one of two situations: |
199 | 524 | // |
200 | 524 | // 1. The receiver is trying to block in `poll` |
201 | 524 | // 2. The receiver is being dropped |
202 | 524 | // |
203 | 524 | // In the first case it'll check the `complete` flag after it's done |
204 | 524 | // blocking to see if it succeeded. In the latter case we don't need to |
205 | 524 | // wake up anyone anyway. So in both cases it's ok to ignore the `None` |
206 | 524 | // case of `try_lock` and bail out. |
207 | 524 | // |
208 | 524 | // The first case crucially depends on `Lock` using `SeqCst` ordering |
209 | 524 | // under the hood. If it instead used `Release` / `Acquire` ordering, |
210 | 524 | // then it would not necessarily synchronize with `inner.complete` |
211 | 524 | // and deadlock might be possible, as was observed in |
212 | 524 | // https://github.com/rust-lang/futures-rs/pull/219. |
213 | 524 | self.complete.store(true, SeqCst); |
214 | | |
215 | 524 | if let Some(mut slot) = self.rx_task.try_lock() { |
216 | 524 | if let Some(task) = slot.take() { |
217 | 137 | drop(slot); |
218 | 137 | task.wake(); |
219 | 387 | } |
220 | 0 | } |
221 | | |
222 | | // If we registered a task for cancel notification drop it to reduce |
223 | | // spurious wakeups |
224 | 524 | if let Some(mut slot) = self.tx_task.try_lock() { |
225 | 524 | drop(slot.take()); |
226 | 524 | } |
227 | 524 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::drop_tx <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::drop_tx Line | Count | Source | 192 | 262 | fn drop_tx(&self) { | 193 | 262 | // Flag that we're a completed `Sender` and try to wake up a receiver. | 194 | 262 | // Whether or not we actually stored any data will get picked up and | 195 | 262 | // translated to either an item or cancellation. | 196 | 262 | // | 197 | 262 | // Note that if we fail to acquire the `rx_task` lock then that means | 198 | 262 | // we're in one of two situations: | 199 | 262 | // | 200 | 262 | // 1. The receiver is trying to block in `poll` | 201 | 262 | // 2. The receiver is being dropped | 202 | 262 | // | 203 | 262 | // In the first case it'll check the `complete` flag after it's done | 204 | 262 | // blocking to see if it succeeded. In the latter case we don't need to | 205 | 262 | // wake up anyone anyway. So in both cases it's ok to ignore the `None` | 206 | 262 | // case of `try_lock` and bail out. | 207 | 262 | // | 208 | 262 | // The first case crucially depends on `Lock` using `SeqCst` ordering | 209 | 262 | // under the hood. If it instead used `Release` / `Acquire` ordering, | 210 | 262 | // then it would not necessarily synchronize with `inner.complete` | 211 | 262 | // and deadlock might be possible, as was observed in | 212 | 262 | // https://github.com/rust-lang/futures-rs/pull/219. | 213 | 262 | self.complete.store(true, SeqCst); | 214 | | | 215 | 262 | if let Some(mut slot) = self.rx_task.try_lock() { | 216 | 262 | if let Some(task) = slot.take() { | 217 | 0 | drop(slot); | 218 | 0 | task.wake(); | 219 | 262 | } | 220 | 0 | } | 221 | | | 222 | | // If we registered a task for cancel notification drop it to reduce | 223 | | // spurious wakeups | 224 | 262 | if let Some(mut slot) = self.tx_task.try_lock() { | 225 | 262 | drop(slot.take()); | 226 | 262 | } | 227 | 262 | } |
<futures_channel::oneshot::Inner<()>>::drop_tx Line | Count | Source | 192 | 262 | fn drop_tx(&self) { | 193 | 262 | // Flag that we're a completed `Sender` and try to wake up a receiver. | 194 | 262 | // Whether or not we actually stored any data will get picked up and | 195 | 262 | // translated to either an item or cancellation. | 196 | 262 | // | 197 | 262 | // Note that if we fail to acquire the `rx_task` lock then that means | 198 | 262 | // we're in one of two situations: | 199 | 262 | // | 200 | 262 | // 1. The receiver is trying to block in `poll` | 201 | 262 | // 2. The receiver is being dropped | 202 | 262 | // | 203 | 262 | // In the first case it'll check the `complete` flag after it's done | 204 | 262 | // blocking to see if it succeeded. In the latter case we don't need to | 205 | 262 | // wake up anyone anyway. So in both cases it's ok to ignore the `None` | 206 | 262 | // case of `try_lock` and bail out. | 207 | 262 | // | 208 | 262 | // The first case crucially depends on `Lock` using `SeqCst` ordering | 209 | 262 | // under the hood. If it instead used `Release` / `Acquire` ordering, | 210 | 262 | // then it would not necessarily synchronize with `inner.complete` | 211 | 262 | // and deadlock might be possible, as was observed in | 212 | 262 | // https://github.com/rust-lang/futures-rs/pull/219. | 213 | 262 | self.complete.store(true, SeqCst); | 214 | | | 215 | 262 | if let Some(mut slot) = self.rx_task.try_lock() { | 216 | 262 | if let Some(task) = slot.take() { | 217 | 137 | drop(slot); | 218 | 137 | task.wake(); | 219 | 137 | } | 220 | 0 | } | 221 | | | 222 | | // If we registered a task for cancel notification drop it to reduce | 223 | | // spurious wakeups | 224 | 262 | if let Some(mut slot) = self.tx_task.try_lock() { | 225 | 262 | drop(slot.take()); | 226 | 262 | } | 227 | 262 | } |
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_tx |
228 | | |
229 | 0 | fn close_rx(&self) { |
230 | 0 | // Flag our completion and then attempt to wake up the sender if it's |
231 | 0 | // blocked. See comments in `drop` below for more info |
232 | 0 | self.complete.store(true, SeqCst); |
233 | 0 | if let Some(mut handle) = self.tx_task.try_lock() { |
234 | 0 | if let Some(task) = handle.take() { |
235 | 0 | drop(handle); |
236 | 0 | task.wake() |
237 | 0 | } |
238 | 0 | } |
239 | 0 | } |
240 | | |
241 | 0 | fn try_recv(&self) -> Result<Option<T>, Canceled> { |
242 | 0 | // If we're complete, either `::close_rx` or `::drop_tx` was called. |
243 | 0 | // We can assume a successful send if data is present. |
244 | 0 | if self.complete.load(SeqCst) { |
245 | 0 | if let Some(mut slot) = self.data.try_lock() { |
246 | 0 | if let Some(data) = slot.take() { |
247 | 0 | return Ok(Some(data)); |
248 | 0 | } |
249 | 0 | } |
250 | 0 | Err(Canceled) |
251 | | } else { |
252 | 0 | Ok(None) |
253 | | } |
254 | 0 | } |
255 | | |
256 | 117k | fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
257 | | // Check to see if some data has arrived. If it hasn't then we need to |
258 | | // block our task. |
259 | | // |
260 | | // Note that the acquisition of the `rx_task` lock might fail below, but |
261 | | // the only situation where this can happen is during `Sender::drop` |
262 | | // when we are indeed completed already. If that's happening then we |
263 | | // know we're completed so keep going. |
264 | 117k | let done = if self.complete.load(SeqCst) { |
265 | 0 | true |
266 | | } else { |
267 | 117k | let task = cx.waker().clone(); |
268 | 117k | match self.rx_task.try_lock() { |
269 | 117k | Some(mut slot) => { |
270 | 117k | *slot = Some(task); |
271 | 117k | false |
272 | | } |
273 | 0 | None => true, |
274 | | } |
275 | | }; |
276 | | |
277 | | // If we're `done` via one of the paths above, then look at the data and |
278 | | // figure out what the answer is. If, however, we stored `rx_task` |
279 | | // successfully above we need to check again if we're completed in case |
280 | | // a message was sent while `rx_task` was locked and couldn't notify us |
281 | | // otherwise. |
282 | | // |
283 | | // If we're not done, and we're not complete, though, then we've |
284 | | // successfully blocked our task and we return `Pending`. |
285 | 117k | if done || self.complete.load(SeqCst) { |
286 | | // If taking the lock fails, the sender will realise that the we're |
287 | | // `done` when it checks the `complete` flag on the way out, and |
288 | | // will treat the send as a failure. |
289 | 0 | if let Some(mut slot) = self.data.try_lock() { |
290 | 0 | if let Some(data) = slot.take() { |
291 | 0 | return Poll::Ready(Ok(data)); |
292 | 0 | } |
293 | 0 | } |
294 | 0 | Poll::Ready(Err(Canceled)) |
295 | | } else { |
296 | 117k | Poll::Pending |
297 | | } |
298 | 117k | } Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::recv <futures_channel::oneshot::Inner<()>>::recv Line | Count | Source | 256 | 117k | fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { | 257 | | // Check to see if some data has arrived. If it hasn't then we need to | 258 | | // block our task. | 259 | | // | 260 | | // Note that the acquisition of the `rx_task` lock might fail below, but | 261 | | // the only situation where this can happen is during `Sender::drop` | 262 | | // when we are indeed completed already. If that's happening then we | 263 | | // know we're completed so keep going. | 264 | 117k | let done = if self.complete.load(SeqCst) { | 265 | 0 | true | 266 | | } else { | 267 | 117k | let task = cx.waker().clone(); | 268 | 117k | match self.rx_task.try_lock() { | 269 | 117k | Some(mut slot) => { | 270 | 117k | *slot = Some(task); | 271 | 117k | false | 272 | | } | 273 | 0 | None => true, | 274 | | } | 275 | | }; | 276 | | | 277 | | // If we're `done` via one of the paths above, then look at the data and | 278 | | // figure out what the answer is. If, however, we stored `rx_task` | 279 | | // successfully above we need to check again if we're completed in case | 280 | | // a message was sent while `rx_task` was locked and couldn't notify us | 281 | | // otherwise. | 282 | | // | 283 | | // If we're not done, and we're not complete, though, then we've | 284 | | // successfully blocked our task and we return `Pending`. | 285 | 117k | if done || self.complete.load(SeqCst) { | 286 | | // If taking the lock fails, the sender will realise that the we're | 287 | | // `done` when it checks the `complete` flag on the way out, and | 288 | | // will treat the send as a failure. | 289 | 0 | if let Some(mut slot) = self.data.try_lock() { | 290 | 0 | if let Some(data) = slot.take() { | 291 | 0 | return Poll::Ready(Ok(data)); | 292 | 0 | } | 293 | 0 | } | 294 | 0 | Poll::Ready(Err(Canceled)) | 295 | | } else { | 296 | 117k | Poll::Pending | 297 | | } | 298 | 117k | } |
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::recv |
299 | | |
300 | 524 | fn drop_rx(&self) { |
301 | 524 | // Indicate to the `Sender` that we're done, so any future calls to |
302 | 524 | // `poll_canceled` are weeded out. |
303 | 524 | self.complete.store(true, SeqCst); |
304 | | |
305 | | // If we've blocked a task then there's no need for it to stick around, |
306 | | // so we need to drop it. If this lock acquisition fails, though, then |
307 | | // it's just because our `Sender` is trying to take the task, so we |
308 | | // let them take care of that. |
309 | 524 | if let Some(mut slot) = self.rx_task.try_lock() { |
310 | 524 | let task = slot.take(); |
311 | 524 | drop(slot); |
312 | 524 | drop(task); |
313 | 524 | } |
314 | | |
315 | | // Finally, if our `Sender` wants to get notified of us going away, it |
316 | | // would have stored something in `tx_task`. Here we try to peel that |
317 | | // out and unpark it. |
318 | | // |
319 | | // Note that the `try_lock` here may fail, but only if the `Sender` is |
320 | | // in the process of filling in the task. If that happens then we |
321 | | // already flagged `complete` and they'll pick that up above. |
322 | 524 | if let Some(mut handle) = self.tx_task.try_lock() { |
323 | 524 | if let Some(task) = handle.take() { |
324 | 137 | drop(handle); |
325 | 137 | task.wake() |
326 | 387 | } |
327 | 0 | } |
328 | 524 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<core::option::Option<devices::virtio::queue::Queue>>>::drop_rx <futures_channel::oneshot::Inner<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::drop_rx Line | Count | Source | 300 | 262 | fn drop_rx(&self) { | 301 | 262 | // Indicate to the `Sender` that we're done, so any future calls to | 302 | 262 | // `poll_canceled` are weeded out. | 303 | 262 | self.complete.store(true, SeqCst); | 304 | | | 305 | | // If we've blocked a task then there's no need for it to stick around, | 306 | | // so we need to drop it. If this lock acquisition fails, though, then | 307 | | // it's just because our `Sender` is trying to take the task, so we | 308 | | // let them take care of that. | 309 | 262 | if let Some(mut slot) = self.rx_task.try_lock() { | 310 | 262 | let task = slot.take(); | 311 | 262 | drop(slot); | 312 | 262 | drop(task); | 313 | 262 | } | 314 | | | 315 | | // Finally, if our `Sender` wants to get notified of us going away, it | 316 | | // would have stored something in `tx_task`. Here we try to peel that | 317 | | // out and unpark it. | 318 | | // | 319 | | // Note that the `try_lock` here may fail, but only if the `Sender` is | 320 | | // in the process of filling in the task. If that happens then we | 321 | | // already flagged `complete` and they'll pick that up above. | 322 | 262 | if let Some(mut handle) = self.tx_task.try_lock() { | 323 | 262 | if let Some(task) = handle.take() { | 324 | 137 | drop(handle); | 325 | 137 | task.wake() | 326 | 125 | } | 327 | 0 | } | 328 | 262 | } |
<futures_channel::oneshot::Inner<()>>::drop_rx Line | Count | Source | 300 | 262 | fn drop_rx(&self) { | 301 | 262 | // Indicate to the `Sender` that we're done, so any future calls to | 302 | 262 | // `poll_canceled` are weeded out. | 303 | 262 | self.complete.store(true, SeqCst); | 304 | | | 305 | | // If we've blocked a task then there's no need for it to stick around, | 306 | | // so we need to drop it. If this lock acquisition fails, though, then | 307 | | // it's just because our `Sender` is trying to take the task, so we | 308 | | // let them take care of that. | 309 | 262 | if let Some(mut slot) = self.rx_task.try_lock() { | 310 | 262 | let task = slot.take(); | 311 | 262 | drop(slot); | 312 | 262 | drop(task); | 313 | 262 | } | 314 | | | 315 | | // Finally, if our `Sender` wants to get notified of us going away, it | 316 | | // would have stored something in `tx_task`. Here we try to peel that | 317 | | // out and unpark it. | 318 | | // | 319 | | // Note that the `try_lock` here may fail, but only if the `Sender` is | 320 | | // in the process of filling in the task. If that happens then we | 321 | | // already flagged `complete` and they'll pick that up above. | 322 | 262 | if let Some(mut handle) = self.tx_task.try_lock() { | 323 | 262 | if let Some(task) = handle.take() { | 324 | 0 | drop(handle); | 325 | 0 | task.wake() | 326 | 262 | } | 327 | 0 | } | 328 | 262 | } |
Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<usize, disk::Error>>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<core::result::Result<(), disk::Error>>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_rx |
329 | | } |
330 | | |
331 | | impl<T> Sender<T> { |
332 | | /// Completes this oneshot with a successful result. |
333 | | /// |
334 | | /// This function will consume `self` and indicate to the other end, the |
335 | | /// [`Receiver`](Receiver), that the value provided is the result of the |
336 | | /// computation this represents. |
337 | | /// |
338 | | /// If the value is successfully enqueued for the remote end to receive, |
339 | | /// then `Ok(())` is returned. If the receiving end was dropped before |
340 | | /// this function was called, however, then `Err(t)` is returned. |
341 | 0 | pub fn send(self, t: T) -> Result<(), T> { |
342 | 0 | self.inner.send(t) |
343 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Sender<core::option::Option<devices::virtio::queue::Queue>>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<()>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<usize, disk::Error>>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<(), disk::Error>>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::send |
344 | | |
345 | | /// Polls this `Sender` half to detect whether its associated |
346 | | /// [`Receiver`](Receiver) has been dropped. |
347 | | /// |
348 | | /// # Return values |
349 | | /// |
350 | | /// If `Ready(())` is returned then the associated `Receiver` has been |
351 | | /// dropped, which means any work required for sending should be canceled. |
352 | | /// |
353 | | /// If `Pending` is returned then the associated `Receiver` is still |
354 | | /// alive and may be able to receive a message if sent. The current task, |
355 | | /// however, is scheduled to receive a notification if the corresponding |
356 | | /// `Receiver` goes away. |
357 | 137 | pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
358 | 137 | self.inner.poll_canceled(cx) |
359 | 137 | } <futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>>>::poll_canceled Line | Count | Source | 357 | 137 | pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { | 358 | 137 | self.inner.poll_canceled(cx) | 359 | 137 | } |
Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::poll_canceled |
360 | | |
361 | | /// Creates a future that resolves when this `Sender`'s corresponding |
362 | | /// [`Receiver`](Receiver) half has hung up. |
363 | | /// |
364 | | /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) |
365 | | /// to expose a [`Future`](core::future::Future). |
366 | 0 | pub fn cancellation(&mut self) -> Cancellation<'_, T> { |
367 | 0 | Cancellation { inner: self } |
368 | 0 | } |
369 | | |
370 | | /// Tests to see whether this `Sender`'s corresponding `Receiver` |
371 | | /// has been dropped. |
372 | | /// |
373 | | /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not |
374 | | /// enqueue a task for wakeup upon cancellation, but merely reports the |
375 | | /// current state, which may be subject to concurrent modification. |
376 | 0 | pub fn is_canceled(&self) -> bool { |
377 | 0 | self.inner.is_canceled() |
378 | 0 | } |
379 | | |
380 | | /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether |
381 | | /// they were created by the same call to `channel`. |
382 | 0 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
383 | 0 | Arc::ptr_eq(&self.inner, &receiver.inner) |
384 | 0 | } |
385 | | } |
386 | | |
387 | | impl<T> Drop for Sender<T> { |
388 | 524 | fn drop(&mut self) { |
389 | 524 | self.inner.drop_tx() |
390 | 524 | } Unexecuted instantiation: <futures_channel::oneshot::Sender<core::option::Option<devices::virtio::queue::Queue>> as core::ops::drop::Drop>::drop <futures_channel::oneshot::Sender<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::ops::drop::Drop>::drop Line | Count | Source | 388 | 262 | fn drop(&mut self) { | 389 | 262 | self.inner.drop_tx() | 390 | 262 | } |
<futures_channel::oneshot::Sender<()> as core::ops::drop::Drop>::drop Line | Count | Source | 388 | 262 | fn drop(&mut self) { | 389 | 262 | self.inner.drop_tx() | 390 | 262 | } |
Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<usize, disk::Error>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<core::result::Result<(), disk::Error>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<_> as core::ops::drop::Drop>::drop |
391 | | } |
392 | | |
393 | | impl<T: fmt::Debug> fmt::Debug for Sender<T> { |
394 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
395 | 0 | f.debug_struct("Sender").field("complete", &self.inner.complete).finish() |
396 | 0 | } |
397 | | } |
398 | | |
399 | | /// A future that resolves when the receiving end of a channel has hung up. |
400 | | /// |
401 | | /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). |
402 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
403 | | #[derive(Debug)] |
404 | | pub struct Cancellation<'a, T> { |
405 | | inner: &'a mut Sender<T>, |
406 | | } |
407 | | |
408 | | impl<T> Future for Cancellation<'_, T> { |
409 | | type Output = (); |
410 | | |
411 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
412 | 0 | self.inner.poll_canceled(cx) |
413 | 0 | } |
414 | | } |
415 | | |
416 | | /// Error returned from a [`Receiver`](Receiver) when the corresponding |
417 | | /// [`Sender`](Sender) is dropped. |
418 | | #[derive(Clone, Copy, PartialEq, Eq, Debug)] |
419 | | pub struct Canceled; |
420 | | |
421 | | impl fmt::Display for Canceled { |
422 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
423 | 0 | write!(f, "oneshot canceled") |
424 | 0 | } |
425 | | } |
426 | | |
427 | | #[cfg(feature = "std")] |
428 | | impl std::error::Error for Canceled {} |
429 | | |
430 | | impl<T> Receiver<T> { |
431 | | /// Gracefully close this receiver, preventing any subsequent attempts to |
432 | | /// send to it. |
433 | | /// |
434 | | /// Any `send` operation which happens after this method returns is |
435 | | /// guaranteed to fail. After calling this method, you can use |
436 | | /// [`Receiver::poll`](core::future::Future::poll) to determine whether a |
437 | | /// message had previously been sent. |
438 | 0 | pub fn close(&mut self) { |
439 | 0 | self.inner.close_rx() |
440 | 0 | } |
441 | | |
442 | | /// Attempts to receive a message outside of the context of a task. |
443 | | /// |
444 | | /// Does not schedule a task wakeup or have any other side effects. |
445 | | /// |
446 | | /// A return value of `None` must be considered immediately stale (out of |
447 | | /// date) unless [`close`](Receiver::close) has been called first. |
448 | | /// |
449 | | /// Returns an error if the sender was dropped. |
450 | 0 | pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { |
451 | 0 | self.inner.try_recv() |
452 | 0 | } |
453 | | } |
454 | | |
455 | | impl<T> Future for Receiver<T> { |
456 | | type Output = Result<T, Canceled>; |
457 | | |
458 | 117k | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
459 | 117k | self.inner.recv(cx) |
460 | 117k | } Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::option::Option<devices::virtio::queue::Queue>> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::future::future::Future>::poll <futures_channel::oneshot::Receiver<()> as core::future::future::Future>::poll Line | Count | Source | 458 | 117k | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { | 459 | 117k | self.inner.recv(cx) | 460 | 117k | } |
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<usize, disk::Error>> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<(), disk::Error>> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::future::future::Future>::poll |
461 | | } |
462 | | |
463 | | impl<T> FusedFuture for Receiver<T> { |
464 | 117k | fn is_terminated(&self) -> bool { |
465 | 117k | if self.inner.complete.load(SeqCst) { |
466 | 0 | if let Some(slot) = self.inner.data.try_lock() { |
467 | 0 | if slot.is_some() { |
468 | 0 | return false; |
469 | 0 | } |
470 | 0 | } |
471 | 0 | true |
472 | | } else { |
473 | 117k | false |
474 | | } |
475 | 117k | } <futures_channel::oneshot::Receiver<()> as futures_core::future::FusedFuture>::is_terminated Line | Count | Source | 464 | 117k | fn is_terminated(&self) -> bool { | 465 | 117k | if self.inner.complete.load(SeqCst) { | 466 | 0 | if let Some(slot) = self.inner.data.try_lock() { | 467 | 0 | if slot.is_some() { | 468 | 0 | return false; | 469 | 0 | } | 470 | 0 | } | 471 | 0 | true | 472 | | } else { | 473 | 117k | false | 474 | | } | 475 | 117k | } |
Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as futures_core::future::FusedFuture>::is_terminated |
476 | | } |
477 | | |
478 | | impl<T> Drop for Receiver<T> { |
479 | 524 | fn drop(&mut self) { |
480 | 524 | self.inner.drop_rx() |
481 | 524 | } Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::option::Option<devices::virtio::queue::Queue>> as core::ops::drop::Drop>::drop <futures_channel::oneshot::Receiver<core::result::Result<devices::virtio::queue::Queue, alloc::boxed::Box<dyn core::any::Any + core::marker::Send>>> as core::ops::drop::Drop>::drop Line | Count | Source | 479 | 262 | fn drop(&mut self) { | 480 | 262 | self.inner.drop_rx() | 481 | 262 | } |
<futures_channel::oneshot::Receiver<()> as core::ops::drop::Drop>::drop Line | Count | Source | 479 | 262 | fn drop(&mut self) { | 480 | 262 | self.inner.drop_rx() | 481 | 262 | } |
Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<usize, disk::Error>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::result::Result<(), disk::Error>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::ops::drop::Drop>::drop |
482 | | } |
483 | | |
484 | | impl<T: fmt::Debug> fmt::Debug for Receiver<T> { |
485 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
486 | 0 | f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() |
487 | 0 | } |
488 | | } |