/rust/registry/src/index.crates.io-6f17d22bba15001f/flume-0.11.0/src/async.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Futures and other types that allow asynchronous interaction with channels. |
2 | | |
3 | | use std::{ |
4 | | future::Future, |
5 | | pin::Pin, |
6 | | task::{Context, Poll, Waker}, |
7 | | any::Any, |
8 | | ops::Deref, |
9 | | }; |
10 | | use crate::*; |
11 | | use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture}; |
12 | | use futures_sink::Sink; |
13 | | use spin1::Mutex as Spinlock; |
14 | | |
15 | | struct AsyncSignal { |
16 | | waker: Spinlock<Waker>, |
17 | | woken: AtomicBool, |
18 | | stream: bool, |
19 | | } |
20 | | |
21 | | impl AsyncSignal { |
22 | 0 | fn new(cx: &Context, stream: bool) -> Self { |
23 | 0 | AsyncSignal { |
24 | 0 | waker: Spinlock::new(cx.waker().clone()), |
25 | 0 | woken: AtomicBool::new(false), |
26 | 0 | stream, |
27 | 0 | } |
28 | 0 | } |
29 | | } |
30 | | |
31 | | impl Signal for AsyncSignal { |
32 | 0 | fn fire(&self) -> bool { |
33 | 0 | self.woken.store(true, Ordering::SeqCst); |
34 | 0 | self.waker.lock().wake_by_ref(); |
35 | 0 | self.stream |
36 | 0 | } |
37 | | |
38 | 0 | fn as_any(&self) -> &(dyn Any + 'static) { self } |
39 | 0 | fn as_ptr(&self) -> *const () { self as *const _ as *const () } |
40 | | } |
41 | | |
42 | | impl<T> Hook<T, AsyncSignal> { |
43 | | // Update the hook to point to the given Waker. |
44 | | // Returns whether the hook has been previously awakened |
45 | 0 | fn update_waker(&self, cx_waker: &Waker) -> bool { |
46 | 0 | let mut waker = self.1.waker.lock(); |
47 | 0 | let woken = self.1.woken.load(Ordering::SeqCst); |
48 | 0 | if !waker.will_wake(cx_waker) { |
49 | 0 | *waker = cx_waker.clone(); |
50 | 0 |
|
51 | 0 | // Avoid the edge case where the waker was woken just before the wakers were |
52 | 0 | // swapped. |
53 | 0 | if woken { |
54 | 0 | cx_waker.wake_by_ref(); |
55 | 0 | } |
56 | 0 | } |
57 | 0 | woken |
58 | 0 | } |
59 | | } |
60 | | |
61 | 0 | #[derive(Clone)] |
62 | | enum OwnedOrRef<'a, T> { |
63 | | Owned(T), |
64 | | Ref(&'a T), |
65 | | } |
66 | | |
67 | | impl<'a, T> Deref for OwnedOrRef<'a, T> { |
68 | | type Target = T; |
69 | | |
70 | 0 | fn deref(&self) -> &T { |
71 | 0 | match self { |
72 | 0 | OwnedOrRef::Owned(arc) => &arc, |
73 | 0 | OwnedOrRef::Ref(r) => r, |
74 | | } |
75 | 0 | } |
76 | | } |
77 | | |
78 | | impl<T> Sender<T> { |
79 | | /// Asynchronously send a value into the channel, returning an error if all receivers have been |
80 | | /// dropped. If the channel is bounded and is full, the returned future will yield to the async |
81 | | /// runtime. |
82 | | /// |
83 | | /// In the current implementation, the returned future will not yield to the async runtime if the |
84 | | /// channel is unbounded. This may change in later versions. |
85 | 0 | pub fn send_async(&self, item: T) -> SendFut<T> { |
86 | 0 | SendFut { |
87 | 0 | sender: OwnedOrRef::Ref(&self), |
88 | 0 | hook: Some(SendState::NotYetSent(item)), |
89 | 0 | } |
90 | 0 | } |
91 | | |
92 | | /// Convert this sender into a future that asynchronously sends a single message into the channel, |
93 | | /// returning an error if all receivers have been dropped. If the channel is bounded and is full, |
94 | | /// this future will yield to the async runtime. |
95 | | /// |
96 | | /// In the current implementation, the returned future will not yield to the async runtime if the |
97 | | /// channel is unbounded. This may change in later versions. |
98 | 0 | pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> { |
99 | 0 | SendFut { |
100 | 0 | sender: OwnedOrRef::Owned(self), |
101 | 0 | hook: Some(SendState::NotYetSent(item)), |
102 | 0 | } |
103 | 0 | } |
104 | | |
105 | | /// Create an asynchronous sink that uses this sender to asynchronously send messages into the |
106 | | /// channel. The sender will continue to be usable after the sink has been dropped. |
107 | | /// |
108 | | /// In the current implementation, the returned sink will not yield to the async runtime if the |
109 | | /// channel is unbounded. This may change in later versions. |
110 | 0 | pub fn sink(&self) -> SendSink<'_, T> { |
111 | 0 | SendSink(SendFut { |
112 | 0 | sender: OwnedOrRef::Ref(&self), |
113 | 0 | hook: None, |
114 | 0 | }) |
115 | 0 | } |
116 | | |
117 | | /// Convert this sender into a sink that allows asynchronously sending messages into the channel. |
118 | | /// |
119 | | /// In the current implementation, the returned sink will not yield to the async runtime if the |
120 | | /// channel is unbounded. This may change in later versions. |
121 | 0 | pub fn into_sink<'a>(self) -> SendSink<'a, T> { |
122 | 0 | SendSink(SendFut { |
123 | 0 | sender: OwnedOrRef::Owned(self), |
124 | 0 | hook: None, |
125 | 0 | }) |
126 | 0 | } |
127 | | } |
128 | | |
129 | | enum SendState<T> { |
130 | | NotYetSent(T), |
131 | | QueuedItem(Arc<Hook<T, AsyncSignal>>), |
132 | | } |
133 | | |
134 | | /// A future that sends a value into a channel. |
135 | | /// |
136 | | /// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`]. |
137 | | #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] |
138 | | pub struct SendFut<'a, T> { |
139 | | sender: OwnedOrRef<'a, Sender<T>>, |
140 | | // Only none after dropping |
141 | | hook: Option<SendState<T>>, |
142 | | } |
143 | | |
144 | | impl<T> std::marker::Unpin for SendFut<'_, T> {} |
145 | | |
146 | | impl<'a, T> SendFut<'a, T> { |
147 | | /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called |
148 | | /// on drop and just before `start_send` in the `Sink` implementation. |
149 | 0 | fn reset_hook(&mut self) { |
150 | 0 | if let Some(SendState::QueuedItem(hook)) = self.hook.take() { |
151 | 0 | let hook: Arc<Hook<T, dyn Signal>> = hook; |
152 | 0 | wait_lock(&self.sender.shared.chan).sending |
153 | 0 | .as_mut() |
154 | 0 | .unwrap().1 |
155 | 0 | .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
156 | 0 | } |
157 | 0 | } |
158 | | |
159 | | /// See [`Sender::is_disconnected`]. |
160 | 0 | pub fn is_disconnected(&self) -> bool { |
161 | 0 | self.sender.is_disconnected() |
162 | 0 | } |
163 | | |
164 | | /// See [`Sender::is_empty`]. |
165 | 0 | pub fn is_empty(&self) -> bool { |
166 | 0 | self.sender.is_empty() |
167 | 0 | } |
168 | | |
169 | | /// See [`Sender::is_full`]. |
170 | 0 | pub fn is_full(&self) -> bool { |
171 | 0 | self.sender.is_full() |
172 | 0 | } |
173 | | |
174 | | /// See [`Sender::len`]. |
175 | 0 | pub fn len(&self) -> usize { |
176 | 0 | self.sender.len() |
177 | 0 | } |
178 | | |
179 | | /// See [`Sender::capacity`]. |
180 | 0 | pub fn capacity(&self) -> Option<usize> { |
181 | 0 | self.sender.capacity() |
182 | 0 | } |
183 | | } |
184 | | |
185 | | impl<'a, T> Drop for SendFut<'a, T> { |
186 | 0 | fn drop(&mut self) { |
187 | 0 | self.reset_hook() |
188 | 0 | } |
189 | | } |
190 | | |
191 | | |
192 | | impl<'a, T> Future for SendFut<'a, T> { |
193 | | type Output = Result<(), SendError<T>>; |
194 | | |
195 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
196 | 0 | if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() { |
197 | 0 | if hook.is_empty() { |
198 | 0 | Poll::Ready(Ok(())) |
199 | 0 | } else if self.sender.shared.is_disconnected() { |
200 | 0 | let item = hook.try_take(); |
201 | 0 | self.hook = None; |
202 | 0 | match item { |
203 | 0 | Some(item) => Poll::Ready(Err(SendError(item))), |
204 | 0 | None => Poll::Ready(Ok(())), |
205 | | } |
206 | | } else { |
207 | 0 | hook.update_waker(cx.waker()); |
208 | 0 | Poll::Pending |
209 | | } |
210 | 0 | } else if let Some(SendState::NotYetSent(item)) = self.hook.take() { |
211 | 0 | let this = self.get_mut(); |
212 | 0 | let (shared, this_hook) = (&this.sender.shared, &mut this.hook); |
213 | 0 |
|
214 | 0 | shared.send( |
215 | 0 | // item |
216 | 0 | item, |
217 | 0 | // should_block |
218 | 0 | true, |
219 | 0 | // make_signal |
220 | 0 | |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)), |
221 | 0 | // do_block |
222 | 0 | |hook| { |
223 | 0 | *this_hook = Some(SendState::QueuedItem(hook)); |
224 | 0 | Poll::Pending |
225 | 0 | }, |
226 | 0 | ) |
227 | 0 | .map(|r| r.map_err(|err| match err { |
228 | 0 | TrySendTimeoutError::Disconnected(msg) => SendError(msg), |
229 | 0 | _ => unreachable!(), |
230 | 0 | })) |
231 | | } else { // Nothing to do |
232 | 0 | Poll::Ready(Ok(())) |
233 | | } |
234 | 0 | } |
235 | | } |
236 | | |
237 | | impl<'a, T> FusedFuture for SendFut<'a, T> { |
238 | 0 | fn is_terminated(&self) -> bool { |
239 | 0 | self.sender.shared.is_disconnected() |
240 | 0 | } |
241 | | } |
242 | | |
243 | | /// A sink that allows sending values into a channel. |
244 | | /// |
245 | | /// Can be created via [`Sender::sink`] or [`Sender::into_sink`]. |
246 | | pub struct SendSink<'a, T>(SendFut<'a, T>); |
247 | | |
248 | | impl<'a, T> SendSink<'a, T> { |
249 | | /// Returns a clone of a sending half of the channel of this sink. |
250 | 0 | pub fn sender(&self) -> &Sender<T> { |
251 | 0 | &self.0.sender |
252 | 0 | } |
253 | | |
254 | | /// See [`Sender::is_disconnected`]. |
255 | 0 | pub fn is_disconnected(&self) -> bool { |
256 | 0 | self.0.is_disconnected() |
257 | 0 | } |
258 | | |
259 | | /// See [`Sender::is_empty`]. |
260 | 0 | pub fn is_empty(&self) -> bool { |
261 | 0 | self.0.is_empty() |
262 | 0 | } |
263 | | |
264 | | /// See [`Sender::is_full`]. |
265 | 0 | pub fn is_full(&self) -> bool { |
266 | 0 | self.0.is_full() |
267 | 0 | } |
268 | | |
269 | | /// See [`Sender::len`]. |
270 | 0 | pub fn len(&self) -> usize { |
271 | 0 | self.0.len() |
272 | 0 | } |
273 | | |
274 | | /// See [`Sender::capacity`]. |
275 | 0 | pub fn capacity(&self) -> Option<usize> { |
276 | 0 | self.0.capacity() |
277 | 0 | } |
278 | | |
279 | | /// Returns whether the SendSinks are belong to the same channel. |
280 | 0 | pub fn same_channel(&self, other: &Self) -> bool { |
281 | 0 | self.sender().same_channel(other.sender()) |
282 | 0 | } |
283 | | } |
284 | | |
285 | | impl<'a, T> Sink<T> for SendSink<'a, T> { |
286 | | type Error = SendError<T>; |
287 | | |
288 | 0 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
289 | 0 | Pin::new(&mut self.0).poll(cx) |
290 | 0 | } |
291 | | |
292 | 0 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
293 | 0 | self.0.reset_hook(); |
294 | 0 | self.0.hook = Some(SendState::NotYetSent(item)); |
295 | 0 |
|
296 | 0 | Ok(()) |
297 | 0 | } |
298 | | |
299 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
300 | 0 | Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? |
301 | 0 | } |
302 | | |
303 | 0 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
304 | 0 | Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? |
305 | 0 | } |
306 | | } |
307 | | |
308 | | impl<'a, T> Clone for SendSink<'a, T> { |
309 | 0 | fn clone(&self) -> SendSink<'a, T> { |
310 | 0 | SendSink(SendFut { |
311 | 0 | sender: self.0.sender.clone(), |
312 | 0 | hook: None, |
313 | 0 | }) |
314 | 0 | } |
315 | | } |
316 | | |
317 | | impl<T> Receiver<T> { |
318 | | /// Asynchronously receive a value from the channel, returning an error if all senders have been |
319 | | /// dropped. If the channel is empty, the returned future will yield to the async runtime. |
320 | 0 | pub fn recv_async(&self) -> RecvFut<'_, T> { |
321 | 0 | RecvFut::new(OwnedOrRef::Ref(self)) |
322 | 0 | } |
323 | | |
324 | | /// Convert this receiver into a future that asynchronously receives a single message from the |
325 | | /// channel, returning an error if all senders have been dropped. If the channel is empty, this |
326 | | /// future will yield to the async runtime. |
327 | 0 | pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> { |
328 | 0 | RecvFut::new(OwnedOrRef::Owned(self)) |
329 | 0 | } |
330 | | |
331 | | /// Create an asynchronous stream that uses this receiver to asynchronously receive messages |
332 | | /// from the channel. The receiver will continue to be usable after the stream has been dropped. |
333 | 0 | pub fn stream(&self) -> RecvStream<'_, T> { |
334 | 0 | RecvStream(RecvFut::new(OwnedOrRef::Ref(self))) |
335 | 0 | } |
336 | | |
337 | | /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel. |
338 | 0 | pub fn into_stream<'a>(self) -> RecvStream<'a, T> { |
339 | 0 | RecvStream(RecvFut::new(OwnedOrRef::Owned(self))) |
340 | 0 | } |
341 | | } |
342 | | |
343 | | /// A future which allows asynchronously receiving a message. |
344 | | /// |
345 | | /// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`]. |
346 | | #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] |
347 | | pub struct RecvFut<'a, T> { |
348 | | receiver: OwnedOrRef<'a, Receiver<T>>, |
349 | | hook: Option<Arc<Hook<T, AsyncSignal>>>, |
350 | | } |
351 | | |
352 | | impl<'a, T> RecvFut<'a, T> { |
353 | 0 | fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self { |
354 | 0 | Self { |
355 | 0 | receiver, |
356 | 0 | hook: None, |
357 | 0 | } |
358 | 0 | } |
359 | | |
360 | | /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking |
361 | | /// another receiver if this receiver has been woken, so as not to cause any missed wakeups. |
362 | | /// This is called on drop and after a new item is received in `Stream::poll_next`. |
363 | 0 | fn reset_hook(&mut self) { |
364 | 0 | if let Some(hook) = self.hook.take() { |
365 | 0 | let hook: Arc<Hook<T, dyn Signal>> = hook; |
366 | 0 | let mut chan = wait_lock(&self.receiver.shared.chan); |
367 | 0 | // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers? |
368 | 0 | chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
369 | 0 | if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) { |
370 | 0 | // If this signal has been fired, but we're being dropped (and so not listening to it), |
371 | 0 | // pass the signal on to another receiver |
372 | 0 | chan.try_wake_receiver_if_pending(); |
373 | 0 | } |
374 | 0 | } |
375 | 0 | } |
376 | | |
377 | 0 | fn poll_inner( |
378 | 0 | self: Pin<&mut Self>, |
379 | 0 | cx: &mut Context, |
380 | 0 | stream: bool, |
381 | 0 | ) -> Poll<Result<T, RecvError>> { |
382 | 0 | if self.hook.is_some() { |
383 | 0 | match self.receiver.shared.recv_sync(None) { |
384 | 0 | Ok(msg) => return Poll::Ready(Ok(msg)), |
385 | | Err(TryRecvTimeoutError::Disconnected) => { |
386 | 0 | return Poll::Ready(Err(RecvError::Disconnected)) |
387 | | } |
388 | 0 | _ => (), |
389 | 0 | } |
390 | 0 |
|
391 | 0 | let hook = self.hook.as_ref().map(Arc::clone).unwrap(); |
392 | 0 | if hook.update_waker(cx.waker()) { |
393 | 0 | // If the previous hook was awakened, we need to insert it back to the |
394 | 0 | // queue, otherwise, it remains valid. |
395 | 0 | wait_lock(&self.receiver.shared.chan) |
396 | 0 | .waiting |
397 | 0 | .push_back(hook); |
398 | 0 | } |
399 | | // To avoid a missed wakeup, re-check disconnect status here because the channel might have |
400 | | // gotten shut down before we had a chance to push our hook |
401 | 0 | if self.receiver.shared.is_disconnected() { |
402 | | // And now, to avoid a race condition between the first recv attempt and the disconnect check we |
403 | | // just performed, attempt to recv again just in case we missed something. |
404 | 0 | Poll::Ready( |
405 | 0 | self.receiver |
406 | 0 | .shared |
407 | 0 | .recv_sync(None) |
408 | 0 | .map(Ok) |
409 | 0 | .unwrap_or(Err(RecvError::Disconnected)), |
410 | 0 | ) |
411 | | } else { |
412 | 0 | Poll::Pending |
413 | | } |
414 | | } else { |
415 | 0 | let mut_self = self.get_mut(); |
416 | 0 | let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook); |
417 | 0 |
|
418 | 0 | shared.recv( |
419 | 0 | // should_block |
420 | 0 | true, |
421 | 0 | // make_signal |
422 | 0 | || Hook::trigger(AsyncSignal::new(cx, stream)), |
423 | 0 | // do_block |
424 | 0 | |hook| { |
425 | 0 | *this_hook = Some(hook); |
426 | 0 | Poll::Pending |
427 | 0 | }, |
428 | 0 | ) |
429 | 0 | .map(|r| r.map_err(|err| match err { |
430 | 0 | TryRecvTimeoutError::Disconnected => RecvError::Disconnected, |
431 | 0 | _ => unreachable!(), |
432 | 0 | })) |
433 | | } |
434 | 0 | } |
435 | | |
436 | | /// See [`Receiver::is_disconnected`]. |
437 | 0 | pub fn is_disconnected(&self) -> bool { |
438 | 0 | self.receiver.is_disconnected() |
439 | 0 | } |
440 | | |
441 | | /// See [`Receiver::is_empty`]. |
442 | 0 | pub fn is_empty(&self) -> bool { |
443 | 0 | self.receiver.is_empty() |
444 | 0 | } |
445 | | |
446 | | /// See [`Receiver::is_full`]. |
447 | 0 | pub fn is_full(&self) -> bool { |
448 | 0 | self.receiver.is_full() |
449 | 0 | } |
450 | | |
451 | | /// See [`Receiver::len`]. |
452 | 0 | pub fn len(&self) -> usize { |
453 | 0 | self.receiver.len() |
454 | 0 | } |
455 | | |
456 | | /// See [`Receiver::capacity`]. |
457 | 0 | pub fn capacity(&self) -> Option<usize> { |
458 | 0 | self.receiver.capacity() |
459 | 0 | } |
460 | | } |
461 | | |
462 | | impl<'a, T> Drop for RecvFut<'a, T> { |
463 | 0 | fn drop(&mut self) { |
464 | 0 | self.reset_hook(); |
465 | 0 | } |
466 | | } |
467 | | |
468 | | impl<'a, T> Future for RecvFut<'a, T> { |
469 | | type Output = Result<T, RecvError>; |
470 | | |
471 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
472 | 0 | self.poll_inner(cx, false) // stream = false |
473 | 0 | } |
474 | | } |
475 | | |
476 | | impl<'a, T> FusedFuture for RecvFut<'a, T> { |
477 | 0 | fn is_terminated(&self) -> bool { |
478 | 0 | self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty() |
479 | 0 | } |
480 | | } |
481 | | |
482 | | /// A stream which allows asynchronously receiving messages. |
483 | | /// |
484 | | /// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`]. |
485 | | pub struct RecvStream<'a, T>(RecvFut<'a, T>); |
486 | | |
487 | | impl<'a, T> RecvStream<'a, T> { |
488 | | /// See [`Receiver::is_disconnected`]. |
489 | 0 | pub fn is_disconnected(&self) -> bool { |
490 | 0 | self.0.is_disconnected() |
491 | 0 | } |
492 | | |
493 | | /// See [`Receiver::is_empty`]. |
494 | 0 | pub fn is_empty(&self) -> bool { |
495 | 0 | self.0.is_empty() |
496 | 0 | } |
497 | | |
498 | | /// See [`Receiver::is_full`]. |
499 | 0 | pub fn is_full(&self) -> bool { |
500 | 0 | self.0.is_full() |
501 | 0 | } |
502 | | |
503 | | /// See [`Receiver::len`]. |
504 | 0 | pub fn len(&self) -> usize { |
505 | 0 | self.0.len() |
506 | 0 | } |
507 | | |
508 | | /// See [`Receiver::capacity`]. |
509 | 0 | pub fn capacity(&self) -> Option<usize> { |
510 | 0 | self.0.capacity() |
511 | 0 | } |
512 | | |
513 | | /// Returns whether the SendSinks are belong to the same channel. |
514 | 0 | pub fn same_channel(&self, other: &Self) -> bool { |
515 | 0 | self.0.receiver.same_channel(&*other.0.receiver) |
516 | 0 | } |
517 | | } |
518 | | |
519 | | impl<'a, T> Clone for RecvStream<'a, T> { |
520 | 0 | fn clone(&self) -> RecvStream<'a, T> { |
521 | 0 | RecvStream(RecvFut::new(self.0.receiver.clone())) |
522 | 0 | } |
523 | | } |
524 | | |
525 | | impl<'a, T> Stream for RecvStream<'a, T> { |
526 | | type Item = T; |
527 | | |
528 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
529 | 0 | match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true |
530 | 0 | Poll::Pending => Poll::Pending, |
531 | 0 | Poll::Ready(item) => { |
532 | 0 | self.0.reset_hook(); |
533 | 0 | Poll::Ready(item.ok()) |
534 | | } |
535 | | } |
536 | 0 | } |
537 | | } |
538 | | |
539 | | impl<'a, T> FusedStream for RecvStream<'a, T> { |
540 | 0 | fn is_terminated(&self) -> bool { |
541 | 0 | self.0.is_terminated() |
542 | 0 | } |
543 | | } |