Coverage Report

Created: 2024-05-21 06:19

/rust/registry/src/index.crates.io-6f17d22bba15001f/flume-0.11.0/src/async.rs
Line
Count
Source (jump to first uncovered line)
1
//! Futures and other types that allow asynchronous interaction with channels.
2
3
use std::{
4
    future::Future,
5
    pin::Pin,
6
    task::{Context, Poll, Waker},
7
    any::Any,
8
    ops::Deref,
9
};
10
use crate::*;
11
use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
12
use futures_sink::Sink;
13
use spin1::Mutex as Spinlock;
14
15
struct AsyncSignal {
16
    waker: Spinlock<Waker>,
17
    woken: AtomicBool,
18
    stream: bool,
19
}
20
21
impl AsyncSignal {
22
0
    fn new(cx: &Context, stream: bool) -> Self {
23
0
        AsyncSignal {
24
0
            waker: Spinlock::new(cx.waker().clone()),
25
0
            woken: AtomicBool::new(false),
26
0
            stream,
27
0
        }
28
0
    }
29
}
30
31
impl Signal for AsyncSignal {
32
0
    fn fire(&self) -> bool {
33
0
        self.woken.store(true, Ordering::SeqCst);
34
0
        self.waker.lock().wake_by_ref();
35
0
        self.stream
36
0
    }
37
38
0
    fn as_any(&self) -> &(dyn Any + 'static) { self }
39
0
    fn as_ptr(&self) -> *const () { self as *const _ as *const () }
40
}
41
42
impl<T> Hook<T, AsyncSignal> {
43
    // Update the hook to point to the given Waker.
44
    // Returns whether the hook has been previously awakened
45
0
    fn update_waker(&self, cx_waker: &Waker) -> bool {
46
0
        let mut waker = self.1.waker.lock();
47
0
        let woken = self.1.woken.load(Ordering::SeqCst);
48
0
        if !waker.will_wake(cx_waker) {
49
0
            *waker = cx_waker.clone();
50
0
51
0
            // Avoid the edge case where the waker was woken just before the wakers were
52
0
            // swapped.
53
0
            if woken {
54
0
                cx_waker.wake_by_ref();
55
0
            }
56
0
        }
57
0
        woken
58
0
    }
59
}
60
61
0
#[derive(Clone)]
62
enum OwnedOrRef<'a, T> {
63
    Owned(T),
64
    Ref(&'a T),
65
}
66
67
impl<'a, T> Deref for OwnedOrRef<'a, T> {
68
    type Target = T;
69
70
0
    fn deref(&self) -> &T {
71
0
        match self {
72
0
            OwnedOrRef::Owned(arc) => &arc,
73
0
            OwnedOrRef::Ref(r) => r,
74
        }
75
0
    }
76
}
77
78
impl<T> Sender<T> {
79
    /// Asynchronously send a value into the channel, returning an error if all receivers have been
80
    /// dropped. If the channel is bounded and is full, the returned future will yield to the async
81
    /// runtime.
82
    ///
83
    /// In the current implementation, the returned future will not yield to the async runtime if the
84
    /// channel is unbounded. This may change in later versions.
85
0
    pub fn send_async(&self, item: T) -> SendFut<T> {
86
0
        SendFut {
87
0
            sender: OwnedOrRef::Ref(&self),
88
0
            hook: Some(SendState::NotYetSent(item)),
89
0
        }
90
0
    }
91
92
    /// Convert this sender into a future that asynchronously sends a single message into the channel,
93
    /// returning an error if all receivers have been dropped. If the channel is bounded and is full,
94
    /// this future will yield to the async runtime.
95
    ///
96
    /// In the current implementation, the returned future will not yield to the async runtime if the
97
    /// channel is unbounded. This may change in later versions.
98
0
    pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
99
0
        SendFut {
100
0
            sender: OwnedOrRef::Owned(self),
101
0
            hook: Some(SendState::NotYetSent(item)),
102
0
        }
103
0
    }
104
105
    /// Create an asynchronous sink that uses this sender to asynchronously send messages into the
106
    /// channel. The sender will continue to be usable after the sink has been dropped.
107
    ///
108
    /// In the current implementation, the returned sink will not yield to the async runtime if the
109
    /// channel is unbounded. This may change in later versions.
110
0
    pub fn sink(&self) -> SendSink<'_, T> {
111
0
        SendSink(SendFut {
112
0
            sender: OwnedOrRef::Ref(&self),
113
0
            hook: None,
114
0
        })
115
0
    }
116
117
    /// Convert this sender into a sink that allows asynchronously sending messages into the channel.
118
    ///
119
    /// In the current implementation, the returned sink will not yield to the async runtime if the
120
    /// channel is unbounded. This may change in later versions.
121
0
    pub fn into_sink<'a>(self) -> SendSink<'a, T> {
122
0
        SendSink(SendFut {
123
0
            sender: OwnedOrRef::Owned(self),
124
0
            hook: None,
125
0
        })
126
0
    }
127
}
128
129
enum SendState<T> {
130
    NotYetSent(T),
131
    QueuedItem(Arc<Hook<T, AsyncSignal>>),
132
}
133
134
/// A future that sends a value into a channel.
135
///
136
/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`].
137
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
138
pub struct SendFut<'a, T> {
139
    sender: OwnedOrRef<'a, Sender<T>>,
140
    // Only none after dropping
141
    hook: Option<SendState<T>>,
142
}
143
144
impl<T> std::marker::Unpin for SendFut<'_, T> {}
145
146
impl<'a, T> SendFut<'a, T> {
147
    /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called
148
    /// on drop and just before `start_send` in the `Sink` implementation.
149
0
    fn reset_hook(&mut self) {
150
0
        if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
151
0
            let hook: Arc<Hook<T, dyn Signal>> = hook;
152
0
            wait_lock(&self.sender.shared.chan).sending
153
0
                .as_mut()
154
0
                .unwrap().1
155
0
                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
156
0
        }
157
0
    }
158
159
    /// See [`Sender::is_disconnected`].
160
0
    pub fn is_disconnected(&self) -> bool {
161
0
        self.sender.is_disconnected()
162
0
    }
163
164
    /// See [`Sender::is_empty`].
165
0
    pub fn is_empty(&self) -> bool {
166
0
        self.sender.is_empty()
167
0
    }
168
169
    /// See [`Sender::is_full`].
170
0
    pub fn is_full(&self) -> bool {
171
0
        self.sender.is_full()
172
0
    }
173
174
    /// See [`Sender::len`].
175
0
    pub fn len(&self) -> usize {
176
0
        self.sender.len()
177
0
    }
178
179
    /// See [`Sender::capacity`].
180
0
    pub fn capacity(&self) -> Option<usize> {
181
0
        self.sender.capacity()
182
0
    }
183
}
184
185
impl<'a, T> Drop for SendFut<'a, T> {
186
0
    fn drop(&mut self) {
187
0
        self.reset_hook()
188
0
    }
189
}
190
191
192
impl<'a, T> Future for SendFut<'a, T> {
193
    type Output = Result<(), SendError<T>>;
194
195
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196
0
        if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
197
0
            if hook.is_empty() {
198
0
                Poll::Ready(Ok(()))
199
0
            } else if self.sender.shared.is_disconnected() {
200
0
                let item = hook.try_take();
201
0
                self.hook = None;
202
0
                match item {
203
0
                    Some(item) => Poll::Ready(Err(SendError(item))),
204
0
                    None => Poll::Ready(Ok(())),
205
                }
206
            } else {
207
0
                hook.update_waker(cx.waker());
208
0
                Poll::Pending
209
            }
210
0
        } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
211
0
            let this = self.get_mut();
212
0
            let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
213
0
214
0
            shared.send(
215
0
                // item
216
0
                item,
217
0
                // should_block
218
0
                true,
219
0
                // make_signal
220
0
                |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
221
0
                // do_block
222
0
                |hook| {
223
0
                    *this_hook = Some(SendState::QueuedItem(hook));
224
0
                    Poll::Pending
225
0
                },
226
0
            )
227
0
                .map(|r| r.map_err(|err| match err {
228
0
                    TrySendTimeoutError::Disconnected(msg) => SendError(msg),
229
0
                    _ => unreachable!(),
230
0
                }))
231
        } else { // Nothing to do
232
0
            Poll::Ready(Ok(()))
233
        }
234
0
    }
235
}
236
237
impl<'a, T> FusedFuture for SendFut<'a, T> {
238
0
    fn is_terminated(&self) -> bool {
239
0
        self.sender.shared.is_disconnected()
240
0
    }
241
}
242
243
/// A sink that allows sending values into a channel.
244
///
245
/// Can be created via [`Sender::sink`] or [`Sender::into_sink`].
246
pub struct SendSink<'a, T>(SendFut<'a, T>);
247
248
impl<'a, T> SendSink<'a, T> {
249
    /// Returns a clone of a sending half of the channel of this sink.
250
0
    pub fn sender(&self) -> &Sender<T> {
251
0
        &self.0.sender
252
0
    }
253
254
    /// See [`Sender::is_disconnected`].
255
0
    pub fn is_disconnected(&self) -> bool {
256
0
        self.0.is_disconnected()
257
0
    }
258
259
    /// See [`Sender::is_empty`].
260
0
    pub fn is_empty(&self) -> bool {
261
0
        self.0.is_empty()
262
0
    }
263
264
    /// See [`Sender::is_full`].
265
0
    pub fn is_full(&self) -> bool {
266
0
        self.0.is_full()
267
0
    }
268
269
    /// See [`Sender::len`].
270
0
    pub fn len(&self) -> usize {
271
0
        self.0.len()
272
0
    }
273
274
    /// See [`Sender::capacity`].
275
0
    pub fn capacity(&self) -> Option<usize> {
276
0
        self.0.capacity()
277
0
    }
278
279
    /// Returns whether the SendSinks are belong to the same channel.
280
0
    pub fn same_channel(&self, other: &Self) -> bool {
281
0
        self.sender().same_channel(other.sender())
282
0
    }
283
}
284
285
impl<'a, T> Sink<T> for SendSink<'a, T> {
286
    type Error = SendError<T>;
287
288
0
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
289
0
        Pin::new(&mut self.0).poll(cx)
290
0
    }
291
292
0
    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
293
0
        self.0.reset_hook();
294
0
        self.0.hook = Some(SendState::NotYetSent(item));
295
0
296
0
        Ok(())
297
0
    }
298
299
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
300
0
        Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
301
0
    }
302
303
0
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
304
0
        Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
305
0
    }
306
}
307
308
impl<'a, T> Clone for SendSink<'a, T> {
309
0
    fn clone(&self) -> SendSink<'a, T> {
310
0
        SendSink(SendFut {
311
0
            sender: self.0.sender.clone(),
312
0
            hook: None,
313
0
        })
314
0
    }
315
}
316
317
impl<T> Receiver<T> {
318
    /// Asynchronously receive a value from the channel, returning an error if all senders have been
319
    /// dropped. If the channel is empty, the returned future will yield to the async runtime.
320
0
    pub fn recv_async(&self) -> RecvFut<'_, T> {
321
0
        RecvFut::new(OwnedOrRef::Ref(self))
322
0
    }
323
324
    /// Convert this receiver into a future that asynchronously receives a single message from the
325
    /// channel, returning an error if all senders have been dropped. If the channel is empty, this
326
    /// future will yield to the async runtime.
327
0
    pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
328
0
        RecvFut::new(OwnedOrRef::Owned(self))
329
0
    }
330
331
    /// Create an asynchronous stream that uses this receiver to asynchronously receive messages
332
    /// from the channel. The receiver will continue to be usable after the stream has been dropped.
333
0
    pub fn stream(&self) -> RecvStream<'_, T> {
334
0
        RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
335
0
    }
336
337
    /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel.
338
0
    pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
339
0
        RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
340
0
    }
341
}
342
343
/// A future which allows asynchronously receiving a message.
344
///
345
/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`].
346
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
347
pub struct RecvFut<'a, T> {
348
    receiver: OwnedOrRef<'a, Receiver<T>>,
349
    hook: Option<Arc<Hook<T, AsyncSignal>>>,
350
}
351
352
impl<'a, T> RecvFut<'a, T> {
353
0
    fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
354
0
        Self {
355
0
            receiver,
356
0
            hook: None,
357
0
        }
358
0
    }
359
360
    /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking
361
    /// another receiver if this receiver has been woken, so as not to cause any missed wakeups.
362
    /// This is called on drop and after a new item is received in `Stream::poll_next`.
363
0
    fn reset_hook(&mut self) {
364
0
        if let Some(hook) = self.hook.take() {
365
0
            let hook: Arc<Hook<T, dyn Signal>> = hook;
366
0
            let mut chan = wait_lock(&self.receiver.shared.chan);
367
0
            // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers?
368
0
            chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
369
0
            if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) {
370
0
                // If this signal has been fired, but we're being dropped (and so not listening to it),
371
0
                // pass the signal on to another receiver
372
0
                chan.try_wake_receiver_if_pending();
373
0
            }
374
0
        }
375
0
    }
376
377
0
    fn poll_inner(
378
0
        self: Pin<&mut Self>,
379
0
        cx: &mut Context,
380
0
        stream: bool,
381
0
    ) -> Poll<Result<T, RecvError>> {
382
0
        if self.hook.is_some() {
383
0
            match self.receiver.shared.recv_sync(None) {
384
0
                Ok(msg) => return Poll::Ready(Ok(msg)),
385
                Err(TryRecvTimeoutError::Disconnected) => {
386
0
                    return Poll::Ready(Err(RecvError::Disconnected))
387
                }
388
0
                _ => (),
389
0
            }
390
0
391
0
            let hook = self.hook.as_ref().map(Arc::clone).unwrap();
392
0
            if hook.update_waker(cx.waker()) {
393
0
                // If the previous hook was awakened, we need to insert it back to the
394
0
                // queue, otherwise, it remains valid.
395
0
                wait_lock(&self.receiver.shared.chan)
396
0
                    .waiting
397
0
                    .push_back(hook);
398
0
            }
399
            // To avoid a missed wakeup, re-check disconnect status here because the channel might have
400
            // gotten shut down before we had a chance to push our hook
401
0
            if self.receiver.shared.is_disconnected() {
402
                // And now, to avoid a race condition between the first recv attempt and the disconnect check we
403
                // just performed, attempt to recv again just in case we missed something.
404
0
                Poll::Ready(
405
0
                    self.receiver
406
0
                        .shared
407
0
                        .recv_sync(None)
408
0
                        .map(Ok)
409
0
                        .unwrap_or(Err(RecvError::Disconnected)),
410
0
                )
411
            } else {
412
0
                Poll::Pending
413
            }
414
        } else {
415
0
            let mut_self = self.get_mut();
416
0
            let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
417
0
418
0
            shared.recv(
419
0
                // should_block
420
0
                true,
421
0
                // make_signal
422
0
                || Hook::trigger(AsyncSignal::new(cx, stream)),
423
0
                // do_block
424
0
                |hook| {
425
0
                    *this_hook = Some(hook);
426
0
                    Poll::Pending
427
0
                },
428
0
            )
429
0
                .map(|r| r.map_err(|err| match err {
430
0
                    TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
431
0
                    _ => unreachable!(),
432
0
                }))
433
        }
434
0
    }
435
436
    /// See [`Receiver::is_disconnected`].
437
0
    pub fn is_disconnected(&self) -> bool {
438
0
        self.receiver.is_disconnected()
439
0
    }
440
441
    /// See [`Receiver::is_empty`].
442
0
    pub fn is_empty(&self) -> bool {
443
0
        self.receiver.is_empty()
444
0
    }
445
446
    /// See [`Receiver::is_full`].
447
0
    pub fn is_full(&self) -> bool {
448
0
        self.receiver.is_full()
449
0
    }
450
451
    /// See [`Receiver::len`].
452
0
    pub fn len(&self) -> usize {
453
0
        self.receiver.len()
454
0
    }
455
456
    /// See [`Receiver::capacity`].
457
0
    pub fn capacity(&self) -> Option<usize> {
458
0
        self.receiver.capacity()
459
0
    }
460
}
461
462
impl<'a, T> Drop for RecvFut<'a, T> {
463
0
    fn drop(&mut self) {
464
0
        self.reset_hook();
465
0
    }
466
}
467
468
impl<'a, T> Future for RecvFut<'a, T> {
469
    type Output = Result<T, RecvError>;
470
471
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
472
0
        self.poll_inner(cx, false) // stream = false
473
0
    }
474
}
475
476
impl<'a, T> FusedFuture for RecvFut<'a, T> {
477
0
    fn is_terminated(&self) -> bool {
478
0
        self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
479
0
    }
480
}
481
482
/// A stream which allows asynchronously receiving messages.
483
///
484
/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`].
485
pub struct RecvStream<'a, T>(RecvFut<'a, T>);
486
487
impl<'a, T> RecvStream<'a, T> {
488
    /// See [`Receiver::is_disconnected`].
489
0
    pub fn is_disconnected(&self) -> bool {
490
0
        self.0.is_disconnected()
491
0
    }
492
493
    /// See [`Receiver::is_empty`].
494
0
    pub fn is_empty(&self) -> bool {
495
0
        self.0.is_empty()
496
0
    }
497
498
    /// See [`Receiver::is_full`].
499
0
    pub fn is_full(&self) -> bool {
500
0
        self.0.is_full()
501
0
    }
502
503
    /// See [`Receiver::len`].
504
0
    pub fn len(&self) -> usize {
505
0
        self.0.len()
506
0
    }
507
508
    /// See [`Receiver::capacity`].
509
0
    pub fn capacity(&self) -> Option<usize> {
510
0
        self.0.capacity()
511
0
    }
512
513
    /// Returns whether the SendSinks are belong to the same channel.
514
0
    pub fn same_channel(&self, other: &Self) -> bool {
515
0
        self.0.receiver.same_channel(&*other.0.receiver)
516
0
    }
517
}
518
519
impl<'a, T> Clone for RecvStream<'a, T> {
520
0
    fn clone(&self) -> RecvStream<'a, T> {
521
0
        RecvStream(RecvFut::new(self.0.receiver.clone()))
522
0
    }
523
}
524
525
impl<'a, T> Stream for RecvStream<'a, T> {
526
    type Item = T;
527
528
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
529
0
        match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true
530
0
            Poll::Pending => Poll::Pending,
531
0
            Poll::Ready(item) => {
532
0
                self.0.reset_hook();
533
0
                Poll::Ready(item.ok())
534
            }
535
        }
536
0
    }
537
}
538
539
impl<'a, T> FusedStream for RecvStream<'a, T> {
540
0
    fn is_terminated(&self) -> bool {
541
0
        self.0.is_terminated()
542
0
    }
543
}