Coverage Report

Created: 2026-01-08 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/flume-0.12.0/src/async.rs
Line
Count
Source
1
//! Futures and other types that allow asynchronous interaction with channels.
2
3
use crate::*;
4
use futures_core::{
5
    future::FusedFuture,
6
    stream::{FusedStream, Stream},
7
};
8
use futures_sink::Sink;
9
use spin1::Mutex as Spinlock;
10
use std::fmt::{Debug, Formatter};
11
use std::{
12
    any::Any,
13
    future::Future,
14
    ops::Deref,
15
    pin::Pin,
16
    task::{Context, Poll, Waker},
17
};
18
19
struct AsyncSignal {
20
    waker: Spinlock<Waker>,
21
    woken: AtomicBool,
22
    stream: bool,
23
}
24
25
impl AsyncSignal {
26
0
    fn new(cx: &Context, stream: bool) -> Self {
27
0
        AsyncSignal {
28
0
            waker: Spinlock::new(cx.waker().clone()),
29
0
            woken: AtomicBool::new(false),
30
0
            stream,
31
0
        }
32
0
    }
33
}
34
35
impl Signal for AsyncSignal {
36
0
    fn fire(&self) -> bool {
37
0
        self.woken.store(true, Ordering::SeqCst);
38
0
        self.waker.lock().wake_by_ref();
39
0
        self.stream
40
0
    }
41
42
0
    fn as_any(&self) -> &(dyn Any + 'static) {
43
0
        self
44
0
    }
45
0
    fn as_ptr(&self) -> *const () {
46
0
        self as *const _ as *const ()
47
0
    }
48
}
49
50
impl<T> Hook<T, AsyncSignal> {
51
    // Update the hook to point to the given Waker.
52
    // Returns whether the hook has been previously awakened
53
0
    fn update_waker(&self, cx_waker: &Waker) -> bool {
54
0
        let mut waker = self.1.waker.lock();
55
0
        let woken = self.1.woken.load(Ordering::SeqCst);
56
0
        if !waker.will_wake(cx_waker) {
57
0
            *waker = cx_waker.clone();
58
59
            // Avoid the edge case where the waker was woken just before the wakers were
60
            // swapped.
61
0
            if woken {
62
0
                cx_waker.wake_by_ref();
63
0
            }
64
0
        }
65
0
        woken
66
0
    }
67
}
68
69
#[derive(Clone)]
70
enum OwnedOrRef<'a, T> {
71
    Owned(T),
72
    Ref(&'a T),
73
}
74
75
impl<'a, T> Deref for OwnedOrRef<'a, T> {
76
    type Target = T;
77
78
0
    fn deref(&self) -> &T {
79
0
        match self {
80
0
            OwnedOrRef::Owned(arc) => arc,
81
0
            OwnedOrRef::Ref(r) => r,
82
        }
83
0
    }
84
}
85
86
impl<T> Sender<T> {
87
    /// Asynchronously send a value into the channel, returning an error if all receivers have been
88
    /// dropped. If the channel is bounded and is full, the returned future will yield to the async
89
    /// runtime.
90
    ///
91
    /// In the current implementation, the returned future will not yield to the async runtime if the
92
    /// channel is unbounded. This may change in later versions.
93
0
    pub fn send_async(&self, item: T) -> SendFut<'_, T> {
94
0
        SendFut {
95
0
            sender: OwnedOrRef::Ref(self),
96
0
            hook: Some(SendState::NotYetSent(item)),
97
0
        }
98
0
    }
99
100
    /// Convert this sender into a future that asynchronously sends a single message into the channel,
101
    /// returning an error if all receivers have been dropped. If the channel is bounded and is full,
102
    /// this future will yield to the async runtime.
103
    ///
104
    /// In the current implementation, the returned future will not yield to the async runtime if the
105
    /// channel is unbounded. This may change in later versions.
106
0
    pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
107
0
        SendFut {
108
0
            sender: OwnedOrRef::Owned(self),
109
0
            hook: Some(SendState::NotYetSent(item)),
110
0
        }
111
0
    }
112
113
    /// Create an asynchronous sink that uses this sender to asynchronously send messages into the
114
    /// channel. The sender will continue to be usable after the sink has been dropped.
115
    ///
116
    /// In the current implementation, the returned sink will not yield to the async runtime if the
117
    /// channel is unbounded. This may change in later versions.
118
0
    pub fn sink(&self) -> SendSink<'_, T> {
119
0
        SendSink(SendFut {
120
0
            sender: OwnedOrRef::Ref(self),
121
0
            hook: None,
122
0
        })
123
0
    }
124
125
    /// Convert this sender into a sink that allows asynchronously sending messages into the channel.
126
    ///
127
    /// In the current implementation, the returned sink will not yield to the async runtime if the
128
    /// channel is unbounded. This may change in later versions.
129
0
    pub fn into_sink<'a>(self) -> SendSink<'a, T> {
130
0
        SendSink(SendFut {
131
0
            sender: OwnedOrRef::Owned(self),
132
0
            hook: None,
133
0
        })
134
0
    }
135
}
136
137
enum SendState<T> {
138
    NotYetSent(T),
139
    QueuedItem(Arc<Hook<T, AsyncSignal>>),
140
}
141
142
/// A future that sends a value into a channel.
143
///
144
/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`].
145
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
146
pub struct SendFut<'a, T> {
147
    sender: OwnedOrRef<'a, Sender<T>>,
148
    // Only none after dropping
149
    hook: Option<SendState<T>>,
150
}
151
152
impl<'a, T> Debug for SendFut<'a, T> {
153
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
154
0
        f.debug_struct("SendFut").finish()
155
0
    }
156
}
157
158
impl<T> std::marker::Unpin for SendFut<'_, T> {}
159
160
impl<'a, T> SendFut<'a, T> {
161
    /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called
162
    /// on drop and just before `start_send` in the `Sink` implementation.
163
0
    fn reset_hook(&mut self) {
164
0
        if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
165
0
            let hook: Arc<Hook<T, dyn Signal>> = hook;
166
0
            wait_lock(&self.sender.shared.chan)
167
0
                .sending
168
0
                .as_mut()
169
0
                .unwrap()
170
0
                .1
171
0
                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
172
0
        }
173
0
    }
174
175
    /// See [`Sender::is_disconnected`].
176
0
    pub fn is_disconnected(&self) -> bool {
177
0
        self.sender.is_disconnected()
178
0
    }
179
180
    /// See [`Sender::is_empty`].
181
0
    pub fn is_empty(&self) -> bool {
182
0
        self.sender.is_empty()
183
0
    }
184
185
    /// See [`Sender::is_full`].
186
0
    pub fn is_full(&self) -> bool {
187
0
        self.sender.is_full()
188
0
    }
189
190
    /// See [`Sender::len`].
191
0
    pub fn len(&self) -> usize {
192
0
        self.sender.len()
193
0
    }
194
195
    /// See [`Sender::capacity`].
196
0
    pub fn capacity(&self) -> Option<usize> {
197
0
        self.sender.capacity()
198
0
    }
199
}
200
201
impl<'a, T> Drop for SendFut<'a, T> {
202
0
    fn drop(&mut self) {
203
0
        self.reset_hook()
204
0
    }
205
}
206
207
impl<'a, T> Future for SendFut<'a, T> {
208
    type Output = Result<(), SendError<T>>;
209
210
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211
0
        if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
212
0
            if hook.is_empty() {
213
0
                Poll::Ready(Ok(()))
214
0
            } else if self.sender.shared.is_disconnected() {
215
0
                let item = hook.try_take();
216
0
                self.hook = None;
217
0
                match item {
218
0
                    Some(item) => Poll::Ready(Err(SendError(item))),
219
0
                    None => Poll::Ready(Ok(())),
220
                }
221
            } else {
222
0
                hook.update_waker(cx.waker());
223
0
                Poll::Pending
224
            }
225
0
        } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
226
0
            let this = self.get_mut();
227
0
            let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
228
229
0
            shared
230
0
                .send(
231
                    // item
232
0
                    item,
233
                    // should_block
234
                    true,
235
                    // make_signal
236
0
                    |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
237
                    // do_block
238
0
                    |hook| {
239
0
                        *this_hook = Some(SendState::QueuedItem(hook));
240
0
                        Poll::Pending
241
0
                    },
242
                )
243
0
                .map(|r| {
244
0
                    r.map_err(|err| match err {
245
0
                        TrySendTimeoutError::Disconnected(msg) => SendError(msg),
246
0
                        _ => unreachable!(),
247
0
                    })
248
0
                })
249
        } else {
250
            // Nothing to do
251
0
            Poll::Ready(Ok(()))
252
        }
253
0
    }
254
}
255
256
impl<'a, T> FusedFuture for SendFut<'a, T> {
257
0
    fn is_terminated(&self) -> bool {
258
0
        self.sender.shared.is_disconnected()
259
0
    }
260
}
261
262
/// A sink that allows sending values into a channel.
263
///
264
/// Can be created via [`Sender::sink`] or [`Sender::into_sink`].
265
pub struct SendSink<'a, T>(SendFut<'a, T>);
266
267
impl<'a, T> SendSink<'a, T> {
268
    /// Returns a clone of a sending half of the channel of this sink.
269
0
    pub fn sender(&self) -> &Sender<T> {
270
0
        &self.0.sender
271
0
    }
272
273
    /// See [`Sender::is_disconnected`].
274
0
    pub fn is_disconnected(&self) -> bool {
275
0
        self.0.is_disconnected()
276
0
    }
277
278
    /// See [`Sender::is_empty`].
279
0
    pub fn is_empty(&self) -> bool {
280
0
        self.0.is_empty()
281
0
    }
282
283
    /// See [`Sender::is_full`].
284
0
    pub fn is_full(&self) -> bool {
285
0
        self.0.is_full()
286
0
    }
287
288
    /// See [`Sender::len`].
289
0
    pub fn len(&self) -> usize {
290
0
        self.0.len()
291
0
    }
292
293
    /// See [`Sender::capacity`].
294
0
    pub fn capacity(&self) -> Option<usize> {
295
0
        self.0.capacity()
296
0
    }
297
298
    /// Returns whether the SendSinks are belong to the same channel.
299
0
    pub fn same_channel(&self, other: &Self) -> bool {
300
0
        self.sender().same_channel(other.sender())
301
0
    }
302
}
303
304
impl<'a, T> Debug for SendSink<'a, T> {
305
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
306
0
        f.debug_struct("SendSink").finish()
307
0
    }
308
}
309
310
impl<'a, T> Sink<T> for SendSink<'a, T> {
311
    type Error = SendError<T>;
312
313
0
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
314
0
        Pin::new(&mut self.0).poll(cx)
315
0
    }
316
317
0
    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
318
0
        self.0.reset_hook();
319
0
        self.0.hook = Some(SendState::NotYetSent(item));
320
321
0
        Ok(())
322
0
    }
323
324
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
325
0
        Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
326
0
    }
327
328
0
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
329
0
        Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
330
0
    }
331
}
332
333
impl<'a, T> Clone for SendSink<'a, T> {
334
0
    fn clone(&self) -> SendSink<'a, T> {
335
0
        SendSink(SendFut {
336
0
            sender: self.0.sender.clone(),
337
0
            hook: None,
338
0
        })
339
0
    }
340
}
341
342
impl<T> Receiver<T> {
343
    /// Asynchronously receive a value from the channel, returning an error if all senders have been
344
    /// dropped. If the channel is empty, the returned future will yield to the async runtime.
345
0
    pub fn recv_async(&self) -> RecvFut<'_, T> {
346
0
        RecvFut::new(OwnedOrRef::Ref(self))
347
0
    }
348
349
    /// Convert this receiver into a future that asynchronously receives a single message from the
350
    /// channel, returning an error if all senders have been dropped. If the channel is empty, this
351
    /// future will yield to the async runtime.
352
0
    pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
353
0
        RecvFut::new(OwnedOrRef::Owned(self))
354
0
    }
355
356
    /// Create an asynchronous stream that uses this receiver to asynchronously receive messages
357
    /// from the channel. The receiver will continue to be usable after the stream has been dropped.
358
0
    pub fn stream(&self) -> RecvStream<'_, T> {
359
0
        RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
360
0
    }
361
362
    /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel.
363
0
    pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
364
0
        RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
365
0
    }
366
}
367
368
/// A future which allows asynchronously receiving a message.
369
///
370
/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`].
371
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
372
pub struct RecvFut<'a, T> {
373
    receiver: OwnedOrRef<'a, Receiver<T>>,
374
    hook: Option<Arc<Hook<T, AsyncSignal>>>,
375
}
376
377
impl<'a, T> RecvFut<'a, T> {
378
0
    fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
379
0
        Self {
380
0
            receiver,
381
0
            hook: None,
382
0
        }
383
0
    }
384
385
    /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking
386
    /// another receiver if this receiver has been woken, so as not to cause any missed wakeups.
387
    /// This is called on drop and after a new item is received in `Stream::poll_next`.
388
0
    fn reset_hook(&mut self) {
389
0
        if let Some(hook) = self.hook.take() {
390
0
            let hook: Arc<Hook<T, dyn Signal>> = hook;
391
0
            let mut chan = wait_lock(&self.receiver.shared.chan);
392
            // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers?
393
0
            chan.waiting
394
0
                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
395
0
            if hook
396
0
                .signal()
397
0
                .as_any()
398
0
                .downcast_ref::<AsyncSignal>()
399
0
                .unwrap()
400
0
                .woken
401
0
                .load(Ordering::SeqCst)
402
0
            {
403
0
                // If this signal has been fired, but we're being dropped (and so not listening to it),
404
0
                // pass the signal on to another receiver
405
0
                chan.try_wake_receiver_if_pending();
406
0
            }
407
0
        }
408
0
    }
409
410
0
    fn poll_inner(
411
0
        self: Pin<&mut Self>,
412
0
        cx: &mut Context,
413
0
        stream: bool,
414
0
    ) -> Poll<Result<T, RecvError>> {
415
0
        if self.hook.is_some() {
416
0
            match self.receiver.shared.recv_sync(None) {
417
0
                Ok(msg) => return Poll::Ready(Ok(msg)),
418
                Err(TryRecvTimeoutError::Disconnected) => {
419
0
                    return Poll::Ready(Err(RecvError::Disconnected))
420
                }
421
0
                _ => (),
422
            }
423
424
0
            let hook = self.hook.as_ref().map(Arc::clone).unwrap();
425
0
            if hook.update_waker(cx.waker()) {
426
0
                // If the previous hook was awakened, we need to insert it back to the
427
0
                // queue, otherwise, it remains valid.
428
0
                wait_lock(&self.receiver.shared.chan)
429
0
                    .waiting
430
0
                    .push_back(hook);
431
0
            }
432
            // To avoid a missed wakeup, re-check disconnect status here because the channel might have
433
            // gotten shut down before we had a chance to push our hook
434
0
            if self.receiver.shared.is_disconnected() {
435
                // And now, to avoid a race condition between the first recv attempt and the disconnect check we
436
                // just performed, attempt to recv again just in case we missed something.
437
0
                Poll::Ready(
438
0
                    self.receiver
439
0
                        .shared
440
0
                        .recv_sync(None)
441
0
                        .map(Ok)
442
0
                        .unwrap_or(Err(RecvError::Disconnected)),
443
0
                )
444
            } else {
445
0
                Poll::Pending
446
            }
447
        } else {
448
0
            let mut_self = self.get_mut();
449
0
            let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
450
451
0
            shared
452
0
                .recv(
453
                    // should_block
454
                    true,
455
                    // make_signal
456
0
                    || Hook::trigger(AsyncSignal::new(cx, stream)),
457
                    // do_block
458
0
                    |hook| {
459
0
                        *this_hook = Some(hook);
460
0
                        Poll::Pending
461
0
                    },
462
                )
463
0
                .map(|r| {
464
0
                    r.map_err(|err| match err {
465
0
                        TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
466
0
                        _ => unreachable!(),
467
0
                    })
468
0
                })
469
        }
470
0
    }
471
472
    /// See [`Receiver::is_disconnected`].
473
0
    pub fn is_disconnected(&self) -> bool {
474
0
        self.receiver.is_disconnected()
475
0
    }
476
477
    /// See [`Receiver::is_empty`].
478
0
    pub fn is_empty(&self) -> bool {
479
0
        self.receiver.is_empty()
480
0
    }
481
482
    /// See [`Receiver::is_full`].
483
0
    pub fn is_full(&self) -> bool {
484
0
        self.receiver.is_full()
485
0
    }
486
487
    /// See [`Receiver::len`].
488
0
    pub fn len(&self) -> usize {
489
0
        self.receiver.len()
490
0
    }
491
492
    /// See [`Receiver::capacity`].
493
0
    pub fn capacity(&self) -> Option<usize> {
494
0
        self.receiver.capacity()
495
0
    }
496
}
497
498
impl<'a, T> Debug for RecvFut<'a, T> {
499
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
500
0
        f.debug_struct("RecvFut").finish()
501
0
    }
502
}
503
504
impl<'a, T> Drop for RecvFut<'a, T> {
505
0
    fn drop(&mut self) {
506
0
        self.reset_hook();
507
0
    }
508
}
509
510
impl<'a, T> Future for RecvFut<'a, T> {
511
    type Output = Result<T, RecvError>;
512
513
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
514
0
        self.poll_inner(cx, false) // stream = false
515
0
    }
516
}
517
518
impl<'a, T> FusedFuture for RecvFut<'a, T> {
519
0
    fn is_terminated(&self) -> bool {
520
0
        self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
521
0
    }
522
}
523
524
/// A stream which allows asynchronously receiving messages.
525
///
526
/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`].
527
pub struct RecvStream<'a, T>(RecvFut<'a, T>);
528
529
impl<'a, T> RecvStream<'a, T> {
530
    /// See [`Receiver::is_disconnected`].
531
0
    pub fn is_disconnected(&self) -> bool {
532
0
        self.0.is_disconnected()
533
0
    }
534
535
    /// See [`Receiver::is_empty`].
536
0
    pub fn is_empty(&self) -> bool {
537
0
        self.0.is_empty()
538
0
    }
539
540
    /// See [`Receiver::is_full`].
541
0
    pub fn is_full(&self) -> bool {
542
0
        self.0.is_full()
543
0
    }
544
545
    /// See [`Receiver::len`].
546
0
    pub fn len(&self) -> usize {
547
0
        self.0.len()
548
0
    }
549
550
    /// See [`Receiver::capacity`].
551
0
    pub fn capacity(&self) -> Option<usize> {
552
0
        self.0.capacity()
553
0
    }
554
555
    /// Returns whether the SendSinks are belong to the same channel.
556
0
    pub fn same_channel(&self, other: &Self) -> bool {
557
0
        self.0.receiver.same_channel(&*other.0.receiver)
558
0
    }
559
}
560
561
impl<'a, T> Debug for RecvStream<'a, T> {
562
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
563
0
        f.debug_struct("RecvStream").finish()
564
0
    }
565
}
566
567
impl<'a, T> Clone for RecvStream<'a, T> {
568
0
    fn clone(&self) -> RecvStream<'a, T> {
569
0
        RecvStream(RecvFut::new(self.0.receiver.clone()))
570
0
    }
571
}
572
573
impl<'a, T> Stream for RecvStream<'a, T> {
574
    type Item = T;
575
576
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577
0
        match Pin::new(&mut self.0).poll_inner(cx, true) {
578
            // stream = true
579
0
            Poll::Pending => Poll::Pending,
580
0
            Poll::Ready(item) => {
581
0
                self.0.reset_hook();
582
0
                Poll::Ready(item.ok())
583
            }
584
        }
585
0
    }
586
}
587
588
impl<'a, T> FusedStream for RecvStream<'a, T> {
589
0
    fn is_terminated(&self) -> bool {
590
0
        self.0.is_terminated()
591
0
    }
592
}