/rust/registry/src/index.crates.io-1949cf8c6b5b557f/flume-0.12.0/src/async.rs
Line | Count | Source |
1 | | //! Futures and other types that allow asynchronous interaction with channels. |
2 | | |
3 | | use crate::*; |
4 | | use futures_core::{ |
5 | | future::FusedFuture, |
6 | | stream::{FusedStream, Stream}, |
7 | | }; |
8 | | use futures_sink::Sink; |
9 | | use spin1::Mutex as Spinlock; |
10 | | use std::fmt::{Debug, Formatter}; |
11 | | use std::{ |
12 | | any::Any, |
13 | | future::Future, |
14 | | ops::Deref, |
15 | | pin::Pin, |
16 | | task::{Context, Poll, Waker}, |
17 | | }; |
18 | | |
19 | | struct AsyncSignal { |
20 | | waker: Spinlock<Waker>, |
21 | | woken: AtomicBool, |
22 | | stream: bool, |
23 | | } |
24 | | |
25 | | impl AsyncSignal { |
26 | 0 | fn new(cx: &Context, stream: bool) -> Self { |
27 | 0 | AsyncSignal { |
28 | 0 | waker: Spinlock::new(cx.waker().clone()), |
29 | 0 | woken: AtomicBool::new(false), |
30 | 0 | stream, |
31 | 0 | } |
32 | 0 | } |
33 | | } |
34 | | |
35 | | impl Signal for AsyncSignal { |
36 | 0 | fn fire(&self) -> bool { |
37 | 0 | self.woken.store(true, Ordering::SeqCst); |
38 | 0 | self.waker.lock().wake_by_ref(); |
39 | 0 | self.stream |
40 | 0 | } |
41 | | |
42 | 0 | fn as_any(&self) -> &(dyn Any + 'static) { |
43 | 0 | self |
44 | 0 | } |
45 | 0 | fn as_ptr(&self) -> *const () { |
46 | 0 | self as *const _ as *const () |
47 | 0 | } |
48 | | } |
49 | | |
50 | | impl<T> Hook<T, AsyncSignal> { |
51 | | // Update the hook to point to the given Waker. |
52 | | // Returns whether the hook has been previously awakened |
53 | 0 | fn update_waker(&self, cx_waker: &Waker) -> bool { |
54 | 0 | let mut waker = self.1.waker.lock(); |
55 | 0 | let woken = self.1.woken.load(Ordering::SeqCst); |
56 | 0 | if !waker.will_wake(cx_waker) { |
57 | 0 | *waker = cx_waker.clone(); |
58 | | |
59 | | // Avoid the edge case where the waker was woken just before the wakers were |
60 | | // swapped. |
61 | 0 | if woken { |
62 | 0 | cx_waker.wake_by_ref(); |
63 | 0 | } |
64 | 0 | } |
65 | 0 | woken |
66 | 0 | } |
67 | | } |
68 | | |
69 | | #[derive(Clone)] |
70 | | enum OwnedOrRef<'a, T> { |
71 | | Owned(T), |
72 | | Ref(&'a T), |
73 | | } |
74 | | |
75 | | impl<'a, T> Deref for OwnedOrRef<'a, T> { |
76 | | type Target = T; |
77 | | |
78 | 0 | fn deref(&self) -> &T { |
79 | 0 | match self { |
80 | 0 | OwnedOrRef::Owned(arc) => arc, |
81 | 0 | OwnedOrRef::Ref(r) => r, |
82 | | } |
83 | 0 | } |
84 | | } |
85 | | |
86 | | impl<T> Sender<T> { |
87 | | /// Asynchronously send a value into the channel, returning an error if all receivers have been |
88 | | /// dropped. If the channel is bounded and is full, the returned future will yield to the async |
89 | | /// runtime. |
90 | | /// |
91 | | /// In the current implementation, the returned future will not yield to the async runtime if the |
92 | | /// channel is unbounded. This may change in later versions. |
93 | 0 | pub fn send_async(&self, item: T) -> SendFut<'_, T> { |
94 | 0 | SendFut { |
95 | 0 | sender: OwnedOrRef::Ref(self), |
96 | 0 | hook: Some(SendState::NotYetSent(item)), |
97 | 0 | } |
98 | 0 | } |
99 | | |
100 | | /// Convert this sender into a future that asynchronously sends a single message into the channel, |
101 | | /// returning an error if all receivers have been dropped. If the channel is bounded and is full, |
102 | | /// this future will yield to the async runtime. |
103 | | /// |
104 | | /// In the current implementation, the returned future will not yield to the async runtime if the |
105 | | /// channel is unbounded. This may change in later versions. |
106 | 0 | pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> { |
107 | 0 | SendFut { |
108 | 0 | sender: OwnedOrRef::Owned(self), |
109 | 0 | hook: Some(SendState::NotYetSent(item)), |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | | /// Create an asynchronous sink that uses this sender to asynchronously send messages into the |
114 | | /// channel. The sender will continue to be usable after the sink has been dropped. |
115 | | /// |
116 | | /// In the current implementation, the returned sink will not yield to the async runtime if the |
117 | | /// channel is unbounded. This may change in later versions. |
118 | 0 | pub fn sink(&self) -> SendSink<'_, T> { |
119 | 0 | SendSink(SendFut { |
120 | 0 | sender: OwnedOrRef::Ref(self), |
121 | 0 | hook: None, |
122 | 0 | }) |
123 | 0 | } |
124 | | |
125 | | /// Convert this sender into a sink that allows asynchronously sending messages into the channel. |
126 | | /// |
127 | | /// In the current implementation, the returned sink will not yield to the async runtime if the |
128 | | /// channel is unbounded. This may change in later versions. |
129 | 0 | pub fn into_sink<'a>(self) -> SendSink<'a, T> { |
130 | 0 | SendSink(SendFut { |
131 | 0 | sender: OwnedOrRef::Owned(self), |
132 | 0 | hook: None, |
133 | 0 | }) |
134 | 0 | } |
135 | | } |
136 | | |
137 | | enum SendState<T> { |
138 | | NotYetSent(T), |
139 | | QueuedItem(Arc<Hook<T, AsyncSignal>>), |
140 | | } |
141 | | |
142 | | /// A future that sends a value into a channel. |
143 | | /// |
144 | | /// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`]. |
145 | | #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] |
146 | | pub struct SendFut<'a, T> { |
147 | | sender: OwnedOrRef<'a, Sender<T>>, |
148 | | // Only none after dropping |
149 | | hook: Option<SendState<T>>, |
150 | | } |
151 | | |
152 | | impl<'a, T> Debug for SendFut<'a, T> { |
153 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
154 | 0 | f.debug_struct("SendFut").finish() |
155 | 0 | } |
156 | | } |
157 | | |
158 | | impl<T> std::marker::Unpin for SendFut<'_, T> {} |
159 | | |
160 | | impl<'a, T> SendFut<'a, T> { |
161 | | /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called |
162 | | /// on drop and just before `start_send` in the `Sink` implementation. |
163 | 0 | fn reset_hook(&mut self) { |
164 | 0 | if let Some(SendState::QueuedItem(hook)) = self.hook.take() { |
165 | 0 | let hook: Arc<Hook<T, dyn Signal>> = hook; |
166 | 0 | wait_lock(&self.sender.shared.chan) |
167 | 0 | .sending |
168 | 0 | .as_mut() |
169 | 0 | .unwrap() |
170 | 0 | .1 |
171 | 0 | .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
172 | 0 | } |
173 | 0 | } |
174 | | |
175 | | /// See [`Sender::is_disconnected`]. |
176 | 0 | pub fn is_disconnected(&self) -> bool { |
177 | 0 | self.sender.is_disconnected() |
178 | 0 | } |
179 | | |
180 | | /// See [`Sender::is_empty`]. |
181 | 0 | pub fn is_empty(&self) -> bool { |
182 | 0 | self.sender.is_empty() |
183 | 0 | } |
184 | | |
185 | | /// See [`Sender::is_full`]. |
186 | 0 | pub fn is_full(&self) -> bool { |
187 | 0 | self.sender.is_full() |
188 | 0 | } |
189 | | |
190 | | /// See [`Sender::len`]. |
191 | 0 | pub fn len(&self) -> usize { |
192 | 0 | self.sender.len() |
193 | 0 | } |
194 | | |
195 | | /// See [`Sender::capacity`]. |
196 | 0 | pub fn capacity(&self) -> Option<usize> { |
197 | 0 | self.sender.capacity() |
198 | 0 | } |
199 | | } |
200 | | |
201 | | impl<'a, T> Drop for SendFut<'a, T> { |
202 | 0 | fn drop(&mut self) { |
203 | 0 | self.reset_hook() |
204 | 0 | } |
205 | | } |
206 | | |
207 | | impl<'a, T> Future for SendFut<'a, T> { |
208 | | type Output = Result<(), SendError<T>>; |
209 | | |
210 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
211 | 0 | if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() { |
212 | 0 | if hook.is_empty() { |
213 | 0 | Poll::Ready(Ok(())) |
214 | 0 | } else if self.sender.shared.is_disconnected() { |
215 | 0 | let item = hook.try_take(); |
216 | 0 | self.hook = None; |
217 | 0 | match item { |
218 | 0 | Some(item) => Poll::Ready(Err(SendError(item))), |
219 | 0 | None => Poll::Ready(Ok(())), |
220 | | } |
221 | | } else { |
222 | 0 | hook.update_waker(cx.waker()); |
223 | 0 | Poll::Pending |
224 | | } |
225 | 0 | } else if let Some(SendState::NotYetSent(item)) = self.hook.take() { |
226 | 0 | let this = self.get_mut(); |
227 | 0 | let (shared, this_hook) = (&this.sender.shared, &mut this.hook); |
228 | | |
229 | 0 | shared |
230 | 0 | .send( |
231 | | // item |
232 | 0 | item, |
233 | | // should_block |
234 | | true, |
235 | | // make_signal |
236 | 0 | |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)), |
237 | | // do_block |
238 | 0 | |hook| { |
239 | 0 | *this_hook = Some(SendState::QueuedItem(hook)); |
240 | 0 | Poll::Pending |
241 | 0 | }, |
242 | | ) |
243 | 0 | .map(|r| { |
244 | 0 | r.map_err(|err| match err { |
245 | 0 | TrySendTimeoutError::Disconnected(msg) => SendError(msg), |
246 | 0 | _ => unreachable!(), |
247 | 0 | }) |
248 | 0 | }) |
249 | | } else { |
250 | | // Nothing to do |
251 | 0 | Poll::Ready(Ok(())) |
252 | | } |
253 | 0 | } |
254 | | } |
255 | | |
256 | | impl<'a, T> FusedFuture for SendFut<'a, T> { |
257 | 0 | fn is_terminated(&self) -> bool { |
258 | 0 | self.sender.shared.is_disconnected() |
259 | 0 | } |
260 | | } |
261 | | |
262 | | /// A sink that allows sending values into a channel. |
263 | | /// |
264 | | /// Can be created via [`Sender::sink`] or [`Sender::into_sink`]. |
265 | | pub struct SendSink<'a, T>(SendFut<'a, T>); |
266 | | |
267 | | impl<'a, T> SendSink<'a, T> { |
268 | | /// Returns a clone of a sending half of the channel of this sink. |
269 | 0 | pub fn sender(&self) -> &Sender<T> { |
270 | 0 | &self.0.sender |
271 | 0 | } |
272 | | |
273 | | /// See [`Sender::is_disconnected`]. |
274 | 0 | pub fn is_disconnected(&self) -> bool { |
275 | 0 | self.0.is_disconnected() |
276 | 0 | } |
277 | | |
278 | | /// See [`Sender::is_empty`]. |
279 | 0 | pub fn is_empty(&self) -> bool { |
280 | 0 | self.0.is_empty() |
281 | 0 | } |
282 | | |
283 | | /// See [`Sender::is_full`]. |
284 | 0 | pub fn is_full(&self) -> bool { |
285 | 0 | self.0.is_full() |
286 | 0 | } |
287 | | |
288 | | /// See [`Sender::len`]. |
289 | 0 | pub fn len(&self) -> usize { |
290 | 0 | self.0.len() |
291 | 0 | } |
292 | | |
293 | | /// See [`Sender::capacity`]. |
294 | 0 | pub fn capacity(&self) -> Option<usize> { |
295 | 0 | self.0.capacity() |
296 | 0 | } |
297 | | |
298 | | /// Returns whether the SendSinks are belong to the same channel. |
299 | 0 | pub fn same_channel(&self, other: &Self) -> bool { |
300 | 0 | self.sender().same_channel(other.sender()) |
301 | 0 | } |
302 | | } |
303 | | |
304 | | impl<'a, T> Debug for SendSink<'a, T> { |
305 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
306 | 0 | f.debug_struct("SendSink").finish() |
307 | 0 | } |
308 | | } |
309 | | |
310 | | impl<'a, T> Sink<T> for SendSink<'a, T> { |
311 | | type Error = SendError<T>; |
312 | | |
313 | 0 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
314 | 0 | Pin::new(&mut self.0).poll(cx) |
315 | 0 | } |
316 | | |
317 | 0 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
318 | 0 | self.0.reset_hook(); |
319 | 0 | self.0.hook = Some(SendState::NotYetSent(item)); |
320 | | |
321 | 0 | Ok(()) |
322 | 0 | } |
323 | | |
324 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
325 | 0 | Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? |
326 | 0 | } |
327 | | |
328 | 0 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { |
329 | 0 | Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? |
330 | 0 | } |
331 | | } |
332 | | |
333 | | impl<'a, T> Clone for SendSink<'a, T> { |
334 | 0 | fn clone(&self) -> SendSink<'a, T> { |
335 | 0 | SendSink(SendFut { |
336 | 0 | sender: self.0.sender.clone(), |
337 | 0 | hook: None, |
338 | 0 | }) |
339 | 0 | } |
340 | | } |
341 | | |
342 | | impl<T> Receiver<T> { |
343 | | /// Asynchronously receive a value from the channel, returning an error if all senders have been |
344 | | /// dropped. If the channel is empty, the returned future will yield to the async runtime. |
345 | 0 | pub fn recv_async(&self) -> RecvFut<'_, T> { |
346 | 0 | RecvFut::new(OwnedOrRef::Ref(self)) |
347 | 0 | } |
348 | | |
349 | | /// Convert this receiver into a future that asynchronously receives a single message from the |
350 | | /// channel, returning an error if all senders have been dropped. If the channel is empty, this |
351 | | /// future will yield to the async runtime. |
352 | 0 | pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> { |
353 | 0 | RecvFut::new(OwnedOrRef::Owned(self)) |
354 | 0 | } |
355 | | |
356 | | /// Create an asynchronous stream that uses this receiver to asynchronously receive messages |
357 | | /// from the channel. The receiver will continue to be usable after the stream has been dropped. |
358 | 0 | pub fn stream(&self) -> RecvStream<'_, T> { |
359 | 0 | RecvStream(RecvFut::new(OwnedOrRef::Ref(self))) |
360 | 0 | } |
361 | | |
362 | | /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel. |
363 | 0 | pub fn into_stream<'a>(self) -> RecvStream<'a, T> { |
364 | 0 | RecvStream(RecvFut::new(OwnedOrRef::Owned(self))) |
365 | 0 | } |
366 | | } |
367 | | |
368 | | /// A future which allows asynchronously receiving a message. |
369 | | /// |
370 | | /// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`]. |
371 | | #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] |
372 | | pub struct RecvFut<'a, T> { |
373 | | receiver: OwnedOrRef<'a, Receiver<T>>, |
374 | | hook: Option<Arc<Hook<T, AsyncSignal>>>, |
375 | | } |
376 | | |
377 | | impl<'a, T> RecvFut<'a, T> { |
378 | 0 | fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self { |
379 | 0 | Self { |
380 | 0 | receiver, |
381 | 0 | hook: None, |
382 | 0 | } |
383 | 0 | } |
384 | | |
385 | | /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking |
386 | | /// another receiver if this receiver has been woken, so as not to cause any missed wakeups. |
387 | | /// This is called on drop and after a new item is received in `Stream::poll_next`. |
388 | 0 | fn reset_hook(&mut self) { |
389 | 0 | if let Some(hook) = self.hook.take() { |
390 | 0 | let hook: Arc<Hook<T, dyn Signal>> = hook; |
391 | 0 | let mut chan = wait_lock(&self.receiver.shared.chan); |
392 | | // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers? |
393 | 0 | chan.waiting |
394 | 0 | .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
395 | 0 | if hook |
396 | 0 | .signal() |
397 | 0 | .as_any() |
398 | 0 | .downcast_ref::<AsyncSignal>() |
399 | 0 | .unwrap() |
400 | 0 | .woken |
401 | 0 | .load(Ordering::SeqCst) |
402 | 0 | { |
403 | 0 | // If this signal has been fired, but we're being dropped (and so not listening to it), |
404 | 0 | // pass the signal on to another receiver |
405 | 0 | chan.try_wake_receiver_if_pending(); |
406 | 0 | } |
407 | 0 | } |
408 | 0 | } |
409 | | |
410 | 0 | fn poll_inner( |
411 | 0 | self: Pin<&mut Self>, |
412 | 0 | cx: &mut Context, |
413 | 0 | stream: bool, |
414 | 0 | ) -> Poll<Result<T, RecvError>> { |
415 | 0 | if self.hook.is_some() { |
416 | 0 | match self.receiver.shared.recv_sync(None) { |
417 | 0 | Ok(msg) => return Poll::Ready(Ok(msg)), |
418 | | Err(TryRecvTimeoutError::Disconnected) => { |
419 | 0 | return Poll::Ready(Err(RecvError::Disconnected)) |
420 | | } |
421 | 0 | _ => (), |
422 | | } |
423 | | |
424 | 0 | let hook = self.hook.as_ref().map(Arc::clone).unwrap(); |
425 | 0 | if hook.update_waker(cx.waker()) { |
426 | 0 | // If the previous hook was awakened, we need to insert it back to the |
427 | 0 | // queue, otherwise, it remains valid. |
428 | 0 | wait_lock(&self.receiver.shared.chan) |
429 | 0 | .waiting |
430 | 0 | .push_back(hook); |
431 | 0 | } |
432 | | // To avoid a missed wakeup, re-check disconnect status here because the channel might have |
433 | | // gotten shut down before we had a chance to push our hook |
434 | 0 | if self.receiver.shared.is_disconnected() { |
435 | | // And now, to avoid a race condition between the first recv attempt and the disconnect check we |
436 | | // just performed, attempt to recv again just in case we missed something. |
437 | 0 | Poll::Ready( |
438 | 0 | self.receiver |
439 | 0 | .shared |
440 | 0 | .recv_sync(None) |
441 | 0 | .map(Ok) |
442 | 0 | .unwrap_or(Err(RecvError::Disconnected)), |
443 | 0 | ) |
444 | | } else { |
445 | 0 | Poll::Pending |
446 | | } |
447 | | } else { |
448 | 0 | let mut_self = self.get_mut(); |
449 | 0 | let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook); |
450 | | |
451 | 0 | shared |
452 | 0 | .recv( |
453 | | // should_block |
454 | | true, |
455 | | // make_signal |
456 | 0 | || Hook::trigger(AsyncSignal::new(cx, stream)), |
457 | | // do_block |
458 | 0 | |hook| { |
459 | 0 | *this_hook = Some(hook); |
460 | 0 | Poll::Pending |
461 | 0 | }, |
462 | | ) |
463 | 0 | .map(|r| { |
464 | 0 | r.map_err(|err| match err { |
465 | 0 | TryRecvTimeoutError::Disconnected => RecvError::Disconnected, |
466 | 0 | _ => unreachable!(), |
467 | 0 | }) |
468 | 0 | }) |
469 | | } |
470 | 0 | } |
471 | | |
472 | | /// See [`Receiver::is_disconnected`]. |
473 | 0 | pub fn is_disconnected(&self) -> bool { |
474 | 0 | self.receiver.is_disconnected() |
475 | 0 | } |
476 | | |
477 | | /// See [`Receiver::is_empty`]. |
478 | 0 | pub fn is_empty(&self) -> bool { |
479 | 0 | self.receiver.is_empty() |
480 | 0 | } |
481 | | |
482 | | /// See [`Receiver::is_full`]. |
483 | 0 | pub fn is_full(&self) -> bool { |
484 | 0 | self.receiver.is_full() |
485 | 0 | } |
486 | | |
487 | | /// See [`Receiver::len`]. |
488 | 0 | pub fn len(&self) -> usize { |
489 | 0 | self.receiver.len() |
490 | 0 | } |
491 | | |
492 | | /// See [`Receiver::capacity`]. |
493 | 0 | pub fn capacity(&self) -> Option<usize> { |
494 | 0 | self.receiver.capacity() |
495 | 0 | } |
496 | | } |
497 | | |
498 | | impl<'a, T> Debug for RecvFut<'a, T> { |
499 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
500 | 0 | f.debug_struct("RecvFut").finish() |
501 | 0 | } |
502 | | } |
503 | | |
504 | | impl<'a, T> Drop for RecvFut<'a, T> { |
505 | 0 | fn drop(&mut self) { |
506 | 0 | self.reset_hook(); |
507 | 0 | } |
508 | | } |
509 | | |
510 | | impl<'a, T> Future for RecvFut<'a, T> { |
511 | | type Output = Result<T, RecvError>; |
512 | | |
513 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
514 | 0 | self.poll_inner(cx, false) // stream = false |
515 | 0 | } |
516 | | } |
517 | | |
518 | | impl<'a, T> FusedFuture for RecvFut<'a, T> { |
519 | 0 | fn is_terminated(&self) -> bool { |
520 | 0 | self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty() |
521 | 0 | } |
522 | | } |
523 | | |
524 | | /// A stream which allows asynchronously receiving messages. |
525 | | /// |
526 | | /// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`]. |
527 | | pub struct RecvStream<'a, T>(RecvFut<'a, T>); |
528 | | |
529 | | impl<'a, T> RecvStream<'a, T> { |
530 | | /// See [`Receiver::is_disconnected`]. |
531 | 0 | pub fn is_disconnected(&self) -> bool { |
532 | 0 | self.0.is_disconnected() |
533 | 0 | } |
534 | | |
535 | | /// See [`Receiver::is_empty`]. |
536 | 0 | pub fn is_empty(&self) -> bool { |
537 | 0 | self.0.is_empty() |
538 | 0 | } |
539 | | |
540 | | /// See [`Receiver::is_full`]. |
541 | 0 | pub fn is_full(&self) -> bool { |
542 | 0 | self.0.is_full() |
543 | 0 | } |
544 | | |
545 | | /// See [`Receiver::len`]. |
546 | 0 | pub fn len(&self) -> usize { |
547 | 0 | self.0.len() |
548 | 0 | } |
549 | | |
550 | | /// See [`Receiver::capacity`]. |
551 | 0 | pub fn capacity(&self) -> Option<usize> { |
552 | 0 | self.0.capacity() |
553 | 0 | } |
554 | | |
555 | | /// Returns whether the SendSinks are belong to the same channel. |
556 | 0 | pub fn same_channel(&self, other: &Self) -> bool { |
557 | 0 | self.0.receiver.same_channel(&*other.0.receiver) |
558 | 0 | } |
559 | | } |
560 | | |
561 | | impl<'a, T> Debug for RecvStream<'a, T> { |
562 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
563 | 0 | f.debug_struct("RecvStream").finish() |
564 | 0 | } |
565 | | } |
566 | | |
567 | | impl<'a, T> Clone for RecvStream<'a, T> { |
568 | 0 | fn clone(&self) -> RecvStream<'a, T> { |
569 | 0 | RecvStream(RecvFut::new(self.0.receiver.clone())) |
570 | 0 | } |
571 | | } |
572 | | |
573 | | impl<'a, T> Stream for RecvStream<'a, T> { |
574 | | type Item = T; |
575 | | |
576 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
577 | 0 | match Pin::new(&mut self.0).poll_inner(cx, true) { |
578 | | // stream = true |
579 | 0 | Poll::Pending => Poll::Pending, |
580 | 0 | Poll::Ready(item) => { |
581 | 0 | self.0.reset_hook(); |
582 | 0 | Poll::Ready(item.ok()) |
583 | | } |
584 | | } |
585 | 0 | } |
586 | | } |
587 | | |
588 | | impl<'a, T> FusedStream for RecvStream<'a, T> { |
589 | 0 | fn is_terminated(&self) -> bool { |
590 | 0 | self.0.is_terminated() |
591 | 0 | } |
592 | | } |