/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-channel-0.3.31/src/oneshot.rs
Line | Count | Source |
1 | | //! A channel for sending a single message between asynchronous tasks. |
2 | | //! |
3 | | //! This is a single-producer, single-consumer channel. |
4 | | |
5 | | use alloc::sync::Arc; |
6 | | use core::fmt; |
7 | | use core::pin::Pin; |
8 | | use core::sync::atomic::AtomicBool; |
9 | | use core::sync::atomic::Ordering::SeqCst; |
10 | | use futures_core::future::{FusedFuture, Future}; |
11 | | use futures_core::task::{Context, Poll, Waker}; |
12 | | |
13 | | use crate::lock::Lock; |
14 | | |
15 | | /// A future for a value that will be provided by another asynchronous task. |
16 | | /// |
17 | | /// This is created by the [`channel`] function. |
18 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
19 | | pub struct Receiver<T> { |
20 | | inner: Arc<Inner<T>>, |
21 | | } |
22 | | |
23 | | /// A means of transmitting a single value to another task. |
24 | | /// |
25 | | /// This is created by the [`channel`] function. |
26 | | pub struct Sender<T> { |
27 | | inner: Arc<Inner<T>>, |
28 | | } |
29 | | |
30 | | // The channels do not ever project Pin to the inner T |
31 | | impl<T> Unpin for Receiver<T> {} |
32 | | impl<T> Unpin for Sender<T> {} |
33 | | |
34 | | /// Internal state of the `Receiver`/`Sender` pair above. This is all used as |
35 | | /// the internal synchronization between the two for send/recv operations. |
36 | | struct Inner<T> { |
37 | | /// Indicates whether this oneshot is complete yet. This is filled in both |
38 | | /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it |
39 | | /// appropriately. |
40 | | /// |
41 | | /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is |
42 | | /// unlocked and ready to be inspected. |
43 | | /// |
44 | | /// For `Sender` if this is `true` then the oneshot has gone away and it |
45 | | /// can return ready from `poll_canceled`. |
46 | | complete: AtomicBool, |
47 | | |
48 | | /// The actual data being transferred as part of this `Receiver`. This is |
49 | | /// filled in by `Sender::complete` and read by `Receiver::poll`. |
50 | | /// |
51 | | /// Note that this is protected by `Lock`, but it is in theory safe to |
52 | | /// replace with an `UnsafeCell` as it's actually protected by `complete` |
53 | | /// above. I wouldn't recommend doing this, however, unless someone is |
54 | | /// supremely confident in the various atomic orderings here and there. |
55 | | data: Lock<Option<T>>, |
56 | | |
57 | | /// Field to store the task which is blocked in `Receiver::poll`. |
58 | | /// |
59 | | /// This is filled in when a oneshot is polled but not ready yet. Note that |
60 | | /// the `Lock` here, unlike in `data` above, is important to resolve races. |
61 | | /// Both the `Receiver` and the `Sender` halves understand that if they |
62 | | /// can't acquire the lock then some important interference is happening. |
63 | | rx_task: Lock<Option<Waker>>, |
64 | | |
65 | | /// Like `rx_task` above, except for the task blocked in |
66 | | /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`. |
67 | | tx_task: Lock<Option<Waker>>, |
68 | | } |
69 | | |
70 | | /// Creates a new one-shot channel for sending a single value across asynchronous tasks. |
71 | | /// |
72 | | /// The channel works for a spsc (single-producer, single-consumer) scheme. |
73 | | /// |
74 | | /// This function is similar to Rust's channel constructor found in the standard |
75 | | /// library. Two halves are returned, the first of which is a `Sender` handle, |
76 | | /// used to signal the end of a computation and provide its value. The second |
77 | | /// half is a `Receiver` which implements the `Future` trait, resolving to the |
78 | | /// value that was given to the `Sender` handle. |
79 | | /// |
80 | | /// Each half can be separately owned and sent across tasks. |
81 | | /// |
82 | | /// # Examples |
83 | | /// |
84 | | /// ``` |
85 | | /// use futures::channel::oneshot; |
86 | | /// use std::{thread, time::Duration}; |
87 | | /// |
88 | | /// let (sender, receiver) = oneshot::channel::<i32>(); |
89 | | /// |
90 | | /// thread::spawn(|| { |
91 | | /// println!("THREAD: sleeping zzz..."); |
92 | | /// thread::sleep(Duration::from_millis(1000)); |
93 | | /// println!("THREAD: i'm awake! sending."); |
94 | | /// sender.send(3).unwrap(); |
95 | | /// }); |
96 | | /// |
97 | | /// println!("MAIN: doing some useful stuff"); |
98 | | /// |
99 | | /// futures::executor::block_on(async { |
100 | | /// println!("MAIN: waiting for msg..."); |
101 | | /// println!("MAIN: got: {:?}", receiver.await) |
102 | | /// }); |
103 | | /// ``` |
104 | 0 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
105 | 0 | let inner = Arc::new(Inner::new()); |
106 | 0 | let receiver = Receiver { inner: inner.clone() }; |
107 | 0 | let sender = Sender { inner }; |
108 | 0 | (sender, receiver) |
109 | 0 | } Unexecuted instantiation: futures_channel::oneshot::channel::<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>> Unexecuted instantiation: futures_channel::oneshot::channel::<core::convert::Infallible> Unexecuted instantiation: futures_channel::oneshot::channel::<hickory_proto::xfer::dns_response::DnsResponseStream> Unexecuted instantiation: futures_channel::oneshot::channel::<http::header::map::HeaderMap> Unexecuted instantiation: futures_channel::oneshot::channel::<_> |
110 | | |
111 | | impl<T> Inner<T> { |
112 | 0 | fn new() -> Self { |
113 | 0 | Self { |
114 | 0 | complete: AtomicBool::new(false), |
115 | 0 | data: Lock::new(None), |
116 | 0 | rx_task: Lock::new(None), |
117 | 0 | tx_task: Lock::new(None), |
118 | 0 | } |
119 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<core::convert::Infallible>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<hickory_proto::xfer::dns_response::DnsResponseStream>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<http::header::map::HeaderMap>>::new Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::new |
120 | | |
121 | 0 | fn send(&self, t: T) -> Result<(), T> { |
122 | 0 | if self.complete.load(SeqCst) { |
123 | 0 | return Err(t); |
124 | 0 | } |
125 | | |
126 | | // Note that this lock acquisition may fail if the receiver |
127 | | // is closed and sets the `complete` flag to `true`, whereupon |
128 | | // the receiver may call `poll()`. |
129 | 0 | if let Some(mut slot) = self.data.try_lock() { |
130 | 0 | assert!(slot.is_none()); |
131 | 0 | *slot = Some(t); |
132 | 0 | drop(slot); |
133 | | |
134 | | // If the receiver called `close()` between the check at the |
135 | | // start of the function, and the lock being released, then |
136 | | // the receiver may not be around to receive it, so try to |
137 | | // pull it back out. |
138 | 0 | if self.complete.load(SeqCst) { |
139 | | // If lock acquisition fails, then receiver is actually |
140 | | // receiving it, so we're good. |
141 | 0 | if let Some(mut slot) = self.data.try_lock() { |
142 | 0 | if let Some(t) = slot.take() { |
143 | 0 | return Err(t); |
144 | 0 | } |
145 | 0 | } |
146 | 0 | } |
147 | 0 | Ok(()) |
148 | | } else { |
149 | | // Must have been closed |
150 | 0 | Err(t) |
151 | | } |
152 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<hickory_proto::xfer::dns_response::DnsResponseStream>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<http::header::map::HeaderMap>>::send Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::send |
153 | | |
154 | 0 | fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { |
155 | | // Fast path up first, just read the flag and see if our other half is |
156 | | // gone. This flag is set both in our destructor and the oneshot |
157 | | // destructor, but our destructor hasn't run yet so if it's set then the |
158 | | // oneshot is gone. |
159 | 0 | if self.complete.load(SeqCst) { |
160 | 0 | return Poll::Ready(()); |
161 | 0 | } |
162 | | |
163 | | // If our other half is not gone then we need to park our current task |
164 | | // and move it into the `tx_task` slot to get notified when it's |
165 | | // actually gone. |
166 | | // |
167 | | // If `try_lock` fails, then the `Receiver` is in the process of using |
168 | | // it, so we can deduce that it's now in the process of going away and |
169 | | // hence we're canceled. If it succeeds then we just store our handle. |
170 | | // |
171 | | // Crucially we then check `complete` *again* before we return. |
172 | | // While we were storing our handle inside `tx_task` the |
173 | | // `Receiver` may have been dropped. The first thing it does is set the |
174 | | // flag, and if it fails to acquire the lock it assumes that we'll see |
175 | | // the flag later on. So... we then try to see the flag later on! |
176 | 0 | let handle = cx.waker().clone(); |
177 | 0 | match self.tx_task.try_lock() { |
178 | 0 | Some(mut p) => *p = Some(handle), |
179 | 0 | None => return Poll::Ready(()), |
180 | | } |
181 | 0 | if self.complete.load(SeqCst) { |
182 | 0 | Poll::Ready(()) |
183 | | } else { |
184 | 0 | Poll::Pending |
185 | | } |
186 | 0 | } |
187 | | |
188 | 0 | fn is_canceled(&self) -> bool { |
189 | 0 | self.complete.load(SeqCst) |
190 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::is_canceled Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::is_canceled |
191 | | |
192 | 0 | fn drop_tx(&self) { |
193 | | // Flag that we're a completed `Sender` and try to wake up a receiver. |
194 | | // Whether or not we actually stored any data will get picked up and |
195 | | // translated to either an item or cancellation. |
196 | | // |
197 | | // Note that if we fail to acquire the `rx_task` lock then that means |
198 | | // we're in one of two situations: |
199 | | // |
200 | | // 1. The receiver is trying to block in `poll` |
201 | | // 2. The receiver is being dropped |
202 | | // |
203 | | // In the first case it'll check the `complete` flag after it's done |
204 | | // blocking to see if it succeeded. In the latter case we don't need to |
205 | | // wake up anyone anyway. So in both cases it's ok to ignore the `None` |
206 | | // case of `try_lock` and bail out. |
207 | | // |
208 | | // The first case crucially depends on `Lock` using `SeqCst` ordering |
209 | | // under the hood. If it instead used `Release` / `Acquire` ordering, |
210 | | // then it would not necessarily synchronize with `inner.complete` |
211 | | // and deadlock might be possible, as was observed in |
212 | | // https://github.com/rust-lang/futures-rs/pull/219. |
213 | 0 | self.complete.store(true, SeqCst); |
214 | | |
215 | 0 | if let Some(mut slot) = self.rx_task.try_lock() { |
216 | 0 | if let Some(task) = slot.take() { |
217 | 0 | drop(slot); |
218 | 0 | task.wake(); |
219 | 0 | } |
220 | 0 | } |
221 | | |
222 | | // If we registered a task for cancel notification drop it to reduce |
223 | | // spurious wakeups |
224 | 0 | if let Some(mut slot) = self.tx_task.try_lock() { |
225 | 0 | drop(slot.take()); |
226 | 0 | } |
227 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<core::convert::Infallible>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<hickory_proto::xfer::dns_response::DnsResponseStream>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<http::header::map::HeaderMap>>::drop_tx Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_tx |
228 | | |
229 | 0 | fn close_rx(&self) { |
230 | | // Flag our completion and then attempt to wake up the sender if it's |
231 | | // blocked. See comments in `drop` below for more info |
232 | 0 | self.complete.store(true, SeqCst); |
233 | 0 | if let Some(mut handle) = self.tx_task.try_lock() { |
234 | 0 | if let Some(task) = handle.take() { |
235 | 0 | drop(handle); |
236 | 0 | task.wake() |
237 | 0 | } |
238 | 0 | } |
239 | 0 | } |
240 | | |
241 | 0 | fn try_recv(&self) -> Result<Option<T>, Canceled> { |
242 | | // If we're complete, either `::close_rx` or `::drop_tx` was called. |
243 | | // We can assume a successful send if data is present. |
244 | 0 | if self.complete.load(SeqCst) { |
245 | 0 | if let Some(mut slot) = self.data.try_lock() { |
246 | 0 | if let Some(data) = slot.take() { |
247 | 0 | return Ok(Some(data)); |
248 | 0 | } |
249 | 0 | } |
250 | 0 | Err(Canceled) |
251 | | } else { |
252 | 0 | Ok(None) |
253 | | } |
254 | 0 | } |
255 | | |
256 | 0 | fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
257 | | // Check to see if some data has arrived. If it hasn't then we need to |
258 | | // block our task. |
259 | | // |
260 | | // Note that the acquisition of the `rx_task` lock might fail below, but |
261 | | // the only situation where this can happen is during `Sender::drop` |
262 | | // when we are indeed completed already. If that's happening then we |
263 | | // know we're completed so keep going. |
264 | 0 | let done = if self.complete.load(SeqCst) { |
265 | 0 | true |
266 | | } else { |
267 | 0 | let task = cx.waker().clone(); |
268 | 0 | match self.rx_task.try_lock() { |
269 | 0 | Some(mut slot) => { |
270 | 0 | *slot = Some(task); |
271 | 0 | false |
272 | | } |
273 | 0 | None => true, |
274 | | } |
275 | | }; |
276 | | |
277 | | // If we're `done` via one of the paths above, then look at the data and |
278 | | // figure out what the answer is. If, however, we stored `rx_task` |
279 | | // successfully above we need to check again if we're completed in case |
280 | | // a message was sent while `rx_task` was locked and couldn't notify us |
281 | | // otherwise. |
282 | | // |
283 | | // If we're not done, and we're not complete, though, then we've |
284 | | // successfully blocked our task and we return `Pending`. |
285 | 0 | if done || self.complete.load(SeqCst) { |
286 | | // If taking the lock fails, the sender will realise that the we're |
287 | | // `done` when it checks the `complete` flag on the way out, and |
288 | | // will treat the send as a failure. |
289 | 0 | if let Some(mut slot) = self.data.try_lock() { |
290 | 0 | if let Some(data) = slot.take() { |
291 | 0 | return Poll::Ready(Ok(data)); |
292 | 0 | } |
293 | 0 | } |
294 | 0 | Poll::Ready(Err(Canceled)) |
295 | | } else { |
296 | 0 | Poll::Pending |
297 | | } |
298 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<core::convert::Infallible>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<hickory_proto::xfer::dns_response::DnsResponseStream>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<http::header::map::HeaderMap>>::recv Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::recv |
299 | | |
300 | 0 | fn drop_rx(&self) { |
301 | | // Indicate to the `Sender` that we're done, so any future calls to |
302 | | // `poll_canceled` are weeded out. |
303 | 0 | self.complete.store(true, SeqCst); |
304 | | |
305 | | // If we've blocked a task then there's no need for it to stick around, |
306 | | // so we need to drop it. If this lock acquisition fails, though, then |
307 | | // it's just because our `Sender` is trying to take the task, so we |
308 | | // let them take care of that. |
309 | 0 | if let Some(mut slot) = self.rx_task.try_lock() { |
310 | 0 | let task = slot.take(); |
311 | 0 | drop(slot); |
312 | 0 | drop(task); |
313 | 0 | } |
314 | | |
315 | | // Finally, if our `Sender` wants to get notified of us going away, it |
316 | | // would have stored something in `tx_task`. Here we try to peel that |
317 | | // out and unpark it. |
318 | | // |
319 | | // Note that the `try_lock` here may fail, but only if the `Sender` is |
320 | | // in the process of filling in the task. If that happens then we |
321 | | // already flagged `complete` and they'll pick that up above. |
322 | 0 | if let Some(mut handle) = self.tx_task.try_lock() { |
323 | 0 | if let Some(task) = handle.take() { |
324 | 0 | drop(handle); |
325 | 0 | task.wake() |
326 | 0 | } |
327 | 0 | } |
328 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Inner<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<core::convert::Infallible>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<hickory_proto::xfer::dns_response::DnsResponseStream>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<http::header::map::HeaderMap>>::drop_rx Unexecuted instantiation: <futures_channel::oneshot::Inner<_>>::drop_rx |
329 | | } |
330 | | |
331 | | impl<T> Sender<T> { |
332 | | /// Completes this oneshot with a successful result. |
333 | | /// |
334 | | /// This function will consume `self` and indicate to the other end, the |
335 | | /// [`Receiver`], that the value provided is the result of the computation |
336 | | /// this represents. |
337 | | /// |
338 | | /// If the value is successfully enqueued for the remote end to receive, |
339 | | /// then `Ok(())` is returned. If the receiving end was dropped before |
340 | | /// this function was called, however, then `Err(t)` is returned. |
341 | 0 | pub fn send(self, t: T) -> Result<(), T> { |
342 | 0 | self.inner.send(t) |
343 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Sender<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<hickory_proto::xfer::dns_response::DnsResponseStream>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<http::header::map::HeaderMap>>::send Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::send |
344 | | |
345 | | /// Polls this `Sender` half to detect whether its associated |
346 | | /// [`Receiver`] has been dropped. |
347 | | /// |
348 | | /// # Return values |
349 | | /// |
350 | | /// If `Ready(())` is returned then the associated `Receiver` has been |
351 | | /// dropped, which means any work required for sending should be canceled. |
352 | | /// |
353 | | /// If `Pending` is returned then the associated `Receiver` is still |
354 | | /// alive and may be able to receive a message if sent. The current task, |
355 | | /// however, is scheduled to receive a notification if the corresponding |
356 | | /// `Receiver` goes away. |
357 | 0 | pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
358 | 0 | self.inner.poll_canceled(cx) |
359 | 0 | } |
360 | | |
361 | | /// Creates a future that resolves when this `Sender`'s corresponding |
362 | | /// [`Receiver`] half has hung up. |
363 | | /// |
364 | | /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) |
365 | | /// to expose a [`Future`]. |
366 | 0 | pub fn cancellation(&mut self) -> Cancellation<'_, T> { |
367 | 0 | Cancellation { inner: self } |
368 | 0 | } |
369 | | |
370 | | /// Tests to see whether this `Sender`'s corresponding `Receiver` |
371 | | /// has been dropped. |
372 | | /// |
373 | | /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not |
374 | | /// enqueue a task for wakeup upon cancellation, but merely reports the |
375 | | /// current state, which may be subject to concurrent modification. |
376 | 0 | pub fn is_canceled(&self) -> bool { |
377 | 0 | self.inner.is_canceled() |
378 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Sender<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>>>::is_canceled Unexecuted instantiation: <futures_channel::oneshot::Sender<_>>::is_canceled |
379 | | |
380 | | /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether |
381 | | /// they were created by the same call to `channel`. |
382 | 0 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
383 | 0 | Arc::ptr_eq(&self.inner, &receiver.inner) |
384 | 0 | } |
385 | | } |
386 | | |
387 | | impl<T> Drop for Sender<T> { |
388 | 0 | fn drop(&mut self) { |
389 | 0 | self.inner.drop_tx() |
390 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Sender<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<core::convert::Infallible> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<hickory_proto::xfer::dns_response::DnsResponseStream> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<http::header::map::HeaderMap> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Sender<_> as core::ops::drop::Drop>::drop |
391 | | } |
392 | | |
393 | | impl<T> fmt::Debug for Sender<T> { |
394 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
395 | 0 | f.debug_struct("Sender").field("complete", &self.inner.complete).finish() |
396 | 0 | } |
397 | | } |
398 | | |
399 | | /// A future that resolves when the receiving end of a channel has hung up. |
400 | | /// |
401 | | /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). |
402 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
403 | | #[derive(Debug)] |
404 | | pub struct Cancellation<'a, T> { |
405 | | inner: &'a mut Sender<T>, |
406 | | } |
407 | | |
408 | | impl<T> Future for Cancellation<'_, T> { |
409 | | type Output = (); |
410 | | |
411 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
412 | 0 | self.inner.poll_canceled(cx) |
413 | 0 | } |
414 | | } |
415 | | |
416 | | /// Error returned from a [`Receiver`] when the corresponding [`Sender`] is |
417 | | /// dropped. |
418 | | #[derive(Clone, Copy, PartialEq, Eq, Debug)] |
419 | | pub struct Canceled; |
420 | | |
421 | | impl fmt::Display for Canceled { |
422 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
423 | 0 | write!(f, "oneshot canceled") |
424 | 0 | } |
425 | | } |
426 | | |
427 | | #[cfg(feature = "std")] |
428 | | impl std::error::Error for Canceled {} |
429 | | |
430 | | impl<T> Receiver<T> { |
431 | | /// Gracefully close this receiver, preventing any subsequent attempts to |
432 | | /// send to it. |
433 | | /// |
434 | | /// Any `send` operation which happens after this method returns is |
435 | | /// guaranteed to fail. After calling this method, you can use |
436 | | /// [`Receiver::poll`](core::future::Future::poll) to determine whether a |
437 | | /// message had previously been sent. |
438 | 0 | pub fn close(&mut self) { |
439 | 0 | self.inner.close_rx() |
440 | 0 | } |
441 | | |
442 | | /// Attempts to receive a message outside of the context of a task. |
443 | | /// |
444 | | /// Does not schedule a task wakeup or have any other side effects. |
445 | | /// |
446 | | /// A return value of `None` must be considered immediately stale (out of |
447 | | /// date) unless [`close`](Receiver::close) has been called first. |
448 | | /// |
449 | | /// Returns an error if the sender was dropped. |
450 | 0 | pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { |
451 | 0 | self.inner.try_recv() |
452 | 0 | } |
453 | | } |
454 | | |
455 | | impl<T> Future for Receiver<T> { |
456 | | type Output = Result<T, Canceled>; |
457 | | |
458 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
459 | 0 | self.inner.recv(cx) |
460 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Receiver<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::convert::Infallible> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<hickory_proto::xfer::dns_response::DnsResponseStream> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<http::header::map::HeaderMap> as core::future::future::Future>::poll Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::future::future::Future>::poll |
461 | | } |
462 | | |
463 | | impl<T> FusedFuture for Receiver<T> { |
464 | 0 | fn is_terminated(&self) -> bool { |
465 | 0 | if self.inner.complete.load(SeqCst) { |
466 | 0 | if let Some(slot) = self.inner.data.try_lock() { |
467 | 0 | if slot.is_some() { |
468 | 0 | return false; |
469 | 0 | } |
470 | 0 | } |
471 | 0 | true |
472 | | } else { |
473 | 0 | false |
474 | | } |
475 | 0 | } |
476 | | } |
477 | | |
478 | | impl<T> Drop for Receiver<T> { |
479 | 0 | fn drop(&mut self) { |
480 | 0 | self.inner.drop_rx() |
481 | 0 | } Unexecuted instantiation: <futures_channel::oneshot::Receiver<hyper_util::client::legacy::client::PoolClient<tonic::body::Body>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<core::convert::Infallible> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<hickory_proto::xfer::dns_response::DnsResponseStream> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<http::header::map::HeaderMap> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::oneshot::Receiver<_> as core::ops::drop::Drop>::drop |
482 | | } |
483 | | |
484 | | impl<T> fmt::Debug for Receiver<T> { |
485 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
486 | 0 | f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() |
487 | 0 | } |
488 | | } |