Coverage Report

Created: 2025-11-28 06:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-channel-2.3.1/src/lib.rs
Line
Count
Source
1
//! An async multi-producer multi-consumer channel, where each message can be received by only
2
//! one of all existing consumers.
3
//!
4
//! There are two kinds of channels:
5
//!
6
//! 1. [Bounded][`bounded()`] channel with limited capacity.
7
//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8
//!
9
//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10
//! among multiple threads.
11
//!
12
//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13
//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14
//!
15
//! The channel can also be closed manually by calling [`Sender::close()`] or
16
//! [`Receiver::close()`].
17
//!
18
//! # Examples
19
//!
20
//! ```
21
//! # futures_lite::future::block_on(async {
22
//! let (s, r) = async_channel::unbounded();
23
//!
24
//! assert_eq!(s.send("Hello").await, Ok(()));
25
//! assert_eq!(r.recv().await, Ok("Hello"));
26
//! # });
27
//! ```
28
29
#![cfg_attr(not(feature = "std"), no_std)]
30
#![forbid(unsafe_code)]
31
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32
#![doc(
33
    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34
)]
35
#![doc(
36
    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37
)]
38
39
extern crate alloc;
40
41
use core::fmt;
42
use core::future::Future;
43
use core::marker::PhantomPinned;
44
use core::pin::Pin;
45
use core::sync::atomic::{AtomicUsize, Ordering};
46
use core::task::{Context, Poll};
47
48
use alloc::sync::Arc;
49
50
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
51
use event_listener_strategy::{
52
    easy_wrapper,
53
    event_listener::{Event, EventListener},
54
    EventListenerFuture, Strategy,
55
};
56
use futures_core::ready;
57
use futures_core::stream::Stream;
58
use pin_project_lite::pin_project;
59
60
struct Channel<T> {
61
    /// Inner message queue.
62
    queue: ConcurrentQueue<T>,
63
64
    /// Send operations waiting while the channel is full.
65
    send_ops: Event,
66
67
    /// Receive operations waiting while the channel is empty and not closed.
68
    recv_ops: Event,
69
70
    /// Stream operations while the channel is empty and not closed.
71
    stream_ops: Event,
72
73
    /// The number of currently active `Sender`s.
74
    sender_count: AtomicUsize,
75
76
    /// The number of currently active `Receivers`s.
77
    receiver_count: AtomicUsize,
78
}
79
80
impl<T> Channel<T> {
81
    /// Closes the channel and notifies all blocked operations.
82
    ///
83
    /// Returns `true` if this call has closed the channel and it was not closed already.
84
0
    fn close(&self) -> bool {
85
0
        if self.queue.close() {
86
            // Notify all send operations.
87
0
            self.send_ops.notify(usize::MAX);
88
89
            // Notify all receive and stream operations.
90
0
            self.recv_ops.notify(usize::MAX);
91
0
            self.stream_ops.notify(usize::MAX);
92
93
0
            true
94
        } else {
95
0
            false
96
        }
97
0
    }
Unexecuted instantiation: <async_channel::Channel<surrealdb_types::notification::Notification>>::close
Unexecuted instantiation: <async_channel::Channel<_>>::close
98
}
99
100
/// Creates a bounded channel.
101
///
102
/// The created channel has space to hold at most `cap` messages at a time.
103
///
104
/// # Panics
105
///
106
/// Capacity must be a positive number. If `cap` is zero, this function will panic.
107
///
108
/// # Examples
109
///
110
/// ```
111
/// # futures_lite::future::block_on(async {
112
/// use async_channel::{bounded, TryRecvError, TrySendError};
113
///
114
/// let (s, r) = bounded(1);
115
///
116
/// assert_eq!(s.send(10).await, Ok(()));
117
/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
118
///
119
/// assert_eq!(r.recv().await, Ok(10));
120
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
121
/// # });
122
/// ```
123
0
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
124
0
    assert!(cap > 0, "capacity cannot be zero");
125
126
0
    let channel = Arc::new(Channel {
127
0
        queue: ConcurrentQueue::bounded(cap),
128
0
        send_ops: Event::new(),
129
0
        recv_ops: Event::new(),
130
0
        stream_ops: Event::new(),
131
0
        sender_count: AtomicUsize::new(1),
132
0
        receiver_count: AtomicUsize::new(1),
133
0
    });
134
135
0
    let s = Sender {
136
0
        channel: channel.clone(),
137
0
    };
138
0
    let r = Receiver {
139
0
        listener: None,
140
0
        channel,
141
0
        _pin: PhantomPinned,
142
0
    };
143
0
    (s, r)
144
0
}
Unexecuted instantiation: async_channel::bounded::<surrealdb_types::notification::Notification>
Unexecuted instantiation: async_channel::bounded::<_>
145
146
/// Creates an unbounded channel.
147
///
148
/// The created channel can hold an unlimited number of messages.
149
///
150
/// # Examples
151
///
152
/// ```
153
/// # futures_lite::future::block_on(async {
154
/// use async_channel::{unbounded, TryRecvError};
155
///
156
/// let (s, r) = unbounded();
157
///
158
/// assert_eq!(s.send(10).await, Ok(()));
159
/// assert_eq!(s.send(20).await, Ok(()));
160
///
161
/// assert_eq!(r.recv().await, Ok(10));
162
/// assert_eq!(r.recv().await, Ok(20));
163
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
164
/// # });
165
/// ```
166
0
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
167
0
    let channel = Arc::new(Channel {
168
0
        queue: ConcurrentQueue::unbounded(),
169
0
        send_ops: Event::new(),
170
0
        recv_ops: Event::new(),
171
0
        stream_ops: Event::new(),
172
0
        sender_count: AtomicUsize::new(1),
173
0
        receiver_count: AtomicUsize::new(1),
174
0
    });
175
176
0
    let s = Sender {
177
0
        channel: channel.clone(),
178
0
    };
179
0
    let r = Receiver {
180
0
        listener: None,
181
0
        channel,
182
0
        _pin: PhantomPinned,
183
0
    };
184
0
    (s, r)
185
0
}
Unexecuted instantiation: async_channel::unbounded::<surrealdb_types::notification::Notification>
Unexecuted instantiation: async_channel::unbounded::<_>
186
187
/// The sending side of a channel.
188
///
189
/// Senders can be cloned and shared among threads. When all senders associated with a channel are
190
/// dropped, the channel becomes closed.
191
///
192
/// The channel can also be closed manually by calling [`Sender::close()`].
193
pub struct Sender<T> {
194
    /// Inner channel state.
195
    channel: Arc<Channel<T>>,
196
}
197
198
impl<T> Sender<T> {
199
    /// Attempts to send a message into the channel.
200
    ///
201
    /// If the channel is full or closed, this method returns an error.
202
    ///
203
    /// # Examples
204
    ///
205
    /// ```
206
    /// use async_channel::{bounded, TrySendError};
207
    ///
208
    /// let (s, r) = bounded(1);
209
    ///
210
    /// assert_eq!(s.try_send(1), Ok(()));
211
    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
212
    ///
213
    /// drop(r);
214
    /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
215
    /// ```
216
0
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
217
0
        match self.channel.queue.push(msg) {
218
            Ok(()) => {
219
                // Notify a blocked receive operation. If the notified operation gets canceled,
220
                // it will notify another blocked receive operation.
221
0
                self.channel.recv_ops.notify_additional(1);
222
223
                // Notify all blocked streams.
224
0
                self.channel.stream_ops.notify(usize::MAX);
225
226
0
                Ok(())
227
            }
228
0
            Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
229
0
            Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
230
        }
231
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification>>::try_send
Unexecuted instantiation: <async_channel::Sender<_>>::try_send
232
233
    /// Sends a message into the channel.
234
    ///
235
    /// If the channel is full, this method waits until there is space for a message.
236
    ///
237
    /// If the channel is closed, this method returns an error.
238
    ///
239
    /// # Examples
240
    ///
241
    /// ```
242
    /// # futures_lite::future::block_on(async {
243
    /// use async_channel::{unbounded, SendError};
244
    ///
245
    /// let (s, r) = unbounded();
246
    ///
247
    /// assert_eq!(s.send(1).await, Ok(()));
248
    /// drop(r);
249
    /// assert_eq!(s.send(2).await, Err(SendError(2)));
250
    /// # });
251
    /// ```
252
0
    pub fn send(&self, msg: T) -> Send<'_, T> {
253
0
        Send::_new(SendInner {
254
0
            sender: self,
255
0
            msg: Some(msg),
256
0
            listener: None,
257
0
            _pin: PhantomPinned,
258
0
        })
259
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification>>::send
Unexecuted instantiation: <async_channel::Sender<_>>::send
260
261
    /// Sends a message into this channel using the blocking strategy.
262
    ///
263
    /// If the channel is full, this method will block until there is room.
264
    /// If the channel is closed, this method returns an error.
265
    ///
266
    /// # Blocking
267
    ///
268
    /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
269
    /// this method will block the current thread until the message is sent.
270
    ///
271
    /// This method should not be used in an asynchronous context. It is intended
272
    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
273
    /// Calling this method in an asynchronous context may result in deadlocks.
274
    ///
275
    /// # Examples
276
    ///
277
    /// ```
278
    /// use async_channel::{unbounded, SendError};
279
    ///
280
    /// let (s, r) = unbounded();
281
    ///
282
    /// assert_eq!(s.send_blocking(1), Ok(()));
283
    /// drop(r);
284
    /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
285
    /// ```
286
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
287
0
    pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
288
0
        self.send(msg).wait()
289
0
    }
290
291
    /// Forcefully push a message into this channel.
292
    ///
293
    /// If the channel is full, this method will replace an existing message in the
294
    /// channel and return it as `Ok(Some(value))`. If the channel is closed, this
295
    /// method will return an error.
296
    ///
297
    /// # Examples
298
    ///
299
    /// ```
300
    /// # futures_lite::future::block_on(async {
301
    /// use async_channel::{bounded, SendError};
302
    ///
303
    /// let (s, r) = bounded(3);
304
    ///
305
    /// assert_eq!(s.send(1).await, Ok(()));
306
    /// assert_eq!(s.send(2).await, Ok(()));
307
    /// assert_eq!(s.force_send(3), Ok(None));
308
    /// assert_eq!(s.force_send(4), Ok(Some(1)));
309
    ///
310
    /// assert_eq!(r.recv().await, Ok(2));
311
    /// assert_eq!(r.recv().await, Ok(3));
312
    /// assert_eq!(r.recv().await, Ok(4));
313
    /// # });
314
    /// ```
315
0
    pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
316
0
        match self.channel.queue.force_push(msg) {
317
0
            Ok(backlog) => {
318
                // Notify a blocked receive operation. If the notified operation gets canceled,
319
                // it will notify another blocked receive operation.
320
0
                self.channel.recv_ops.notify_additional(1);
321
322
                // Notify all blocked streams.
323
0
                self.channel.stream_ops.notify(usize::MAX);
324
325
0
                Ok(backlog)
326
            }
327
328
0
            Err(ForcePushError(reject)) => Err(SendError(reject)),
329
        }
330
0
    }
331
332
    /// Closes the channel.
333
    ///
334
    /// Returns `true` if this call has closed the channel and it was not closed already.
335
    ///
336
    /// The remaining messages can still be received.
337
    ///
338
    /// # Examples
339
    ///
340
    /// ```
341
    /// # futures_lite::future::block_on(async {
342
    /// use async_channel::{unbounded, RecvError};
343
    ///
344
    /// let (s, r) = unbounded();
345
    /// assert_eq!(s.send(1).await, Ok(()));
346
    /// assert!(s.close());
347
    ///
348
    /// assert_eq!(r.recv().await, Ok(1));
349
    /// assert_eq!(r.recv().await, Err(RecvError));
350
    /// # });
351
    /// ```
352
0
    pub fn close(&self) -> bool {
353
0
        self.channel.close()
354
0
    }
355
356
    /// Returns `true` if the channel is closed.
357
    ///
358
    /// # Examples
359
    ///
360
    /// ```
361
    /// # futures_lite::future::block_on(async {
362
    /// use async_channel::{unbounded, RecvError};
363
    ///
364
    /// let (s, r) = unbounded::<()>();
365
    /// assert!(!s.is_closed());
366
    ///
367
    /// drop(r);
368
    /// assert!(s.is_closed());
369
    /// # });
370
    /// ```
371
0
    pub fn is_closed(&self) -> bool {
372
0
        self.channel.queue.is_closed()
373
0
    }
374
375
    /// Returns `true` if the channel is empty.
376
    ///
377
    /// # Examples
378
    ///
379
    /// ```
380
    /// # futures_lite::future::block_on(async {
381
    /// use async_channel::unbounded;
382
    ///
383
    /// let (s, r) = unbounded();
384
    ///
385
    /// assert!(s.is_empty());
386
    /// s.send(1).await;
387
    /// assert!(!s.is_empty());
388
    /// # });
389
    /// ```
390
0
    pub fn is_empty(&self) -> bool {
391
0
        self.channel.queue.is_empty()
392
0
    }
393
394
    /// Returns `true` if the channel is full.
395
    ///
396
    /// Unbounded channels are never full.
397
    ///
398
    /// # Examples
399
    ///
400
    /// ```
401
    /// # futures_lite::future::block_on(async {
402
    /// use async_channel::bounded;
403
    ///
404
    /// let (s, r) = bounded(1);
405
    ///
406
    /// assert!(!s.is_full());
407
    /// s.send(1).await;
408
    /// assert!(s.is_full());
409
    /// # });
410
    /// ```
411
0
    pub fn is_full(&self) -> bool {
412
0
        self.channel.queue.is_full()
413
0
    }
414
415
    /// Returns the number of messages in the channel.
416
    ///
417
    /// # Examples
418
    ///
419
    /// ```
420
    /// # futures_lite::future::block_on(async {
421
    /// use async_channel::unbounded;
422
    ///
423
    /// let (s, r) = unbounded();
424
    /// assert_eq!(s.len(), 0);
425
    ///
426
    /// s.send(1).await;
427
    /// s.send(2).await;
428
    /// assert_eq!(s.len(), 2);
429
    /// # });
430
    /// ```
431
0
    pub fn len(&self) -> usize {
432
0
        self.channel.queue.len()
433
0
    }
434
435
    /// Returns the channel capacity if it's bounded.
436
    ///
437
    /// # Examples
438
    ///
439
    /// ```
440
    /// use async_channel::{bounded, unbounded};
441
    ///
442
    /// let (s, r) = bounded::<i32>(5);
443
    /// assert_eq!(s.capacity(), Some(5));
444
    ///
445
    /// let (s, r) = unbounded::<i32>();
446
    /// assert_eq!(s.capacity(), None);
447
    /// ```
448
0
    pub fn capacity(&self) -> Option<usize> {
449
0
        self.channel.queue.capacity()
450
0
    }
451
452
    /// Returns the number of receivers for the channel.
453
    ///
454
    /// # Examples
455
    ///
456
    /// ```
457
    /// # futures_lite::future::block_on(async {
458
    /// use async_channel::unbounded;
459
    ///
460
    /// let (s, r) = unbounded::<()>();
461
    /// assert_eq!(s.receiver_count(), 1);
462
    ///
463
    /// let r2 = r.clone();
464
    /// assert_eq!(s.receiver_count(), 2);
465
    /// # });
466
    /// ```
467
0
    pub fn receiver_count(&self) -> usize {
468
0
        self.channel.receiver_count.load(Ordering::SeqCst)
469
0
    }
470
471
    /// Returns the number of senders for the channel.
472
    ///
473
    /// # Examples
474
    ///
475
    /// ```
476
    /// # futures_lite::future::block_on(async {
477
    /// use async_channel::unbounded;
478
    ///
479
    /// let (s, r) = unbounded::<()>();
480
    /// assert_eq!(s.sender_count(), 1);
481
    ///
482
    /// let s2 = s.clone();
483
    /// assert_eq!(s.sender_count(), 2);
484
    /// # });
485
    /// ```
486
0
    pub fn sender_count(&self) -> usize {
487
0
        self.channel.sender_count.load(Ordering::SeqCst)
488
0
    }
489
490
    /// Downgrade the sender to a weak reference.
491
0
    pub fn downgrade(&self) -> WeakSender<T> {
492
0
        WeakSender {
493
0
            channel: self.channel.clone(),
494
0
        }
495
0
    }
496
}
497
498
impl<T> Drop for Sender<T> {
499
0
    fn drop(&mut self) {
500
        // Decrement the sender count and close the channel if it drops down to zero.
501
0
        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
502
0
            self.channel.close();
503
0
        }
504
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_channel::Sender<_> as core::ops::drop::Drop>::drop
505
}
506
507
impl<T> fmt::Debug for Sender<T> {
508
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509
0
        write!(f, "Sender {{ .. }}")
510
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::fmt::Debug>::fmt
Unexecuted instantiation: <async_channel::Sender<_> as core::fmt::Debug>::fmt
511
}
512
513
impl<T> Clone for Sender<T> {
514
0
    fn clone(&self) -> Sender<T> {
515
0
        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
516
517
        // Make sure the count never overflows, even if lots of sender clones are leaked.
518
0
        if count > usize::MAX / 2 {
519
0
            abort();
520
0
        }
521
522
0
        Sender {
523
0
            channel: self.channel.clone(),
524
0
        }
525
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::clone::Clone>::clone
Unexecuted instantiation: <async_channel::Sender<_> as core::clone::Clone>::clone
526
}
527
528
pin_project! {
529
    /// The receiving side of a channel.
530
    ///
531
    /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
532
    /// are dropped, the channel becomes closed.
533
    ///
534
    /// The channel can also be closed manually by calling [`Receiver::close()`].
535
    ///
536
    /// Receivers implement the [`Stream`] trait.
537
    pub struct Receiver<T> {
538
        // Inner channel state.
539
        channel: Arc<Channel<T>>,
540
541
        // Listens for a send or close event to unblock this stream.
542
        listener: Option<EventListener>,
543
544
        // Keeping this type `!Unpin` enables future optimizations.
545
        #[pin]
546
        _pin: PhantomPinned
547
    }
548
549
    impl<T> PinnedDrop for Receiver<T> {
550
        fn drop(this: Pin<&mut Self>) {
551
            let this = this.project();
552
553
            // Decrement the receiver count and close the channel if it drops down to zero.
554
            if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
555
                this.channel.close();
556
            }
557
        }
558
    }
559
}
560
561
impl<T> Receiver<T> {
562
    /// Attempts to receive a message from the channel.
563
    ///
564
    /// If the channel is empty, or empty and closed, this method returns an error.
565
    ///
566
    /// # Examples
567
    ///
568
    /// ```
569
    /// # futures_lite::future::block_on(async {
570
    /// use async_channel::{unbounded, TryRecvError};
571
    ///
572
    /// let (s, r) = unbounded();
573
    /// assert_eq!(s.send(1).await, Ok(()));
574
    ///
575
    /// assert_eq!(r.try_recv(), Ok(1));
576
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
577
    ///
578
    /// drop(s);
579
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
580
    /// # });
581
    /// ```
582
0
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
583
0
        match self.channel.queue.pop() {
584
0
            Ok(msg) => {
585
                // Notify a blocked send operation. If the notified operation gets canceled, it
586
                // will notify another blocked send operation.
587
0
                self.channel.send_ops.notify_additional(1);
588
589
0
                Ok(msg)
590
            }
591
0
            Err(PopError::Empty) => Err(TryRecvError::Empty),
592
0
            Err(PopError::Closed) => Err(TryRecvError::Closed),
593
        }
594
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::try_recv
Unexecuted instantiation: <async_channel::Receiver<_>>::try_recv
595
596
    /// Receives a message from the channel.
597
    ///
598
    /// If the channel is empty, this method waits until there is a message.
599
    ///
600
    /// If the channel is closed, this method receives a message or returns an error if there are
601
    /// no more messages.
602
    ///
603
    /// # Examples
604
    ///
605
    /// ```
606
    /// # futures_lite::future::block_on(async {
607
    /// use async_channel::{unbounded, RecvError};
608
    ///
609
    /// let (s, r) = unbounded();
610
    ///
611
    /// assert_eq!(s.send(1).await, Ok(()));
612
    /// drop(s);
613
    ///
614
    /// assert_eq!(r.recv().await, Ok(1));
615
    /// assert_eq!(r.recv().await, Err(RecvError));
616
    /// # });
617
    /// ```
618
0
    pub fn recv(&self) -> Recv<'_, T> {
619
0
        Recv::_new(RecvInner {
620
0
            receiver: self,
621
0
            listener: None,
622
0
            _pin: PhantomPinned,
623
0
        })
624
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::recv
Unexecuted instantiation: <async_channel::Receiver<_>>::recv
625
626
    /// Receives a message from the channel using the blocking strategy.
627
    ///
628
    /// If the channel is empty, this method waits until there is a message.
629
    /// If the channel is closed, this method receives a message or returns an error if there are
630
    /// no more messages.
631
    ///
632
    /// # Blocking
633
    ///
634
    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
635
    /// this method will block the current thread until the message is sent.
636
    ///
637
    /// This method should not be used in an asynchronous context. It is intended
638
    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
639
    /// Calling this method in an asynchronous context may result in deadlocks.
640
    ///
641
    /// # Examples
642
    ///
643
    /// ```
644
    /// use async_channel::{unbounded, RecvError};
645
    ///
646
    /// let (s, r) = unbounded();
647
    ///
648
    /// assert_eq!(s.send_blocking(1), Ok(()));
649
    /// drop(s);
650
    ///
651
    /// assert_eq!(r.recv_blocking(), Ok(1));
652
    /// assert_eq!(r.recv_blocking(), Err(RecvError));
653
    /// ```
654
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
655
0
    pub fn recv_blocking(&self) -> Result<T, RecvError> {
656
0
        self.recv().wait()
657
0
    }
658
659
    /// Closes the channel.
660
    ///
661
    /// Returns `true` if this call has closed the channel and it was not closed already.
662
    ///
663
    /// The remaining messages can still be received.
664
    ///
665
    /// # Examples
666
    ///
667
    /// ```
668
    /// # futures_lite::future::block_on(async {
669
    /// use async_channel::{unbounded, RecvError};
670
    ///
671
    /// let (s, r) = unbounded();
672
    /// assert_eq!(s.send(1).await, Ok(()));
673
    ///
674
    /// assert!(r.close());
675
    /// assert_eq!(r.recv().await, Ok(1));
676
    /// assert_eq!(r.recv().await, Err(RecvError));
677
    /// # });
678
    /// ```
679
0
    pub fn close(&self) -> bool {
680
0
        self.channel.close()
681
0
    }
682
683
    /// Returns `true` if the channel is closed.
684
    ///
685
    /// # Examples
686
    ///
687
    /// ```
688
    /// # futures_lite::future::block_on(async {
689
    /// use async_channel::{unbounded, RecvError};
690
    ///
691
    /// let (s, r) = unbounded::<()>();
692
    /// assert!(!r.is_closed());
693
    ///
694
    /// drop(s);
695
    /// assert!(r.is_closed());
696
    /// # });
697
    /// ```
698
0
    pub fn is_closed(&self) -> bool {
699
0
        self.channel.queue.is_closed()
700
0
    }
701
702
    /// Returns `true` if the channel is empty.
703
    ///
704
    /// # Examples
705
    ///
706
    /// ```
707
    /// # futures_lite::future::block_on(async {
708
    /// use async_channel::unbounded;
709
    ///
710
    /// let (s, r) = unbounded();
711
    ///
712
    /// assert!(s.is_empty());
713
    /// s.send(1).await;
714
    /// assert!(!s.is_empty());
715
    /// # });
716
    /// ```
717
0
    pub fn is_empty(&self) -> bool {
718
0
        self.channel.queue.is_empty()
719
0
    }
720
721
    /// Returns `true` if the channel is full.
722
    ///
723
    /// Unbounded channels are never full.
724
    ///
725
    /// # Examples
726
    ///
727
    /// ```
728
    /// # futures_lite::future::block_on(async {
729
    /// use async_channel::bounded;
730
    ///
731
    /// let (s, r) = bounded(1);
732
    ///
733
    /// assert!(!r.is_full());
734
    /// s.send(1).await;
735
    /// assert!(r.is_full());
736
    /// # });
737
    /// ```
738
0
    pub fn is_full(&self) -> bool {
739
0
        self.channel.queue.is_full()
740
0
    }
741
742
    /// Returns the number of messages in the channel.
743
    ///
744
    /// # Examples
745
    ///
746
    /// ```
747
    /// # futures_lite::future::block_on(async {
748
    /// use async_channel::unbounded;
749
    ///
750
    /// let (s, r) = unbounded();
751
    /// assert_eq!(r.len(), 0);
752
    ///
753
    /// s.send(1).await;
754
    /// s.send(2).await;
755
    /// assert_eq!(r.len(), 2);
756
    /// # });
757
    /// ```
758
0
    pub fn len(&self) -> usize {
759
0
        self.channel.queue.len()
760
0
    }
761
762
    /// Returns the channel capacity if it's bounded.
763
    ///
764
    /// # Examples
765
    ///
766
    /// ```
767
    /// use async_channel::{bounded, unbounded};
768
    ///
769
    /// let (s, r) = bounded::<i32>(5);
770
    /// assert_eq!(r.capacity(), Some(5));
771
    ///
772
    /// let (s, r) = unbounded::<i32>();
773
    /// assert_eq!(r.capacity(), None);
774
    /// ```
775
0
    pub fn capacity(&self) -> Option<usize> {
776
0
        self.channel.queue.capacity()
777
0
    }
778
779
    /// Returns the number of receivers for the channel.
780
    ///
781
    /// # Examples
782
    ///
783
    /// ```
784
    /// # futures_lite::future::block_on(async {
785
    /// use async_channel::unbounded;
786
    ///
787
    /// let (s, r) = unbounded::<()>();
788
    /// assert_eq!(r.receiver_count(), 1);
789
    ///
790
    /// let r2 = r.clone();
791
    /// assert_eq!(r.receiver_count(), 2);
792
    /// # });
793
    /// ```
794
0
    pub fn receiver_count(&self) -> usize {
795
0
        self.channel.receiver_count.load(Ordering::SeqCst)
796
0
    }
797
798
    /// Returns the number of senders for the channel.
799
    ///
800
    /// # Examples
801
    ///
802
    /// ```
803
    /// # futures_lite::future::block_on(async {
804
    /// use async_channel::unbounded;
805
    ///
806
    /// let (s, r) = unbounded::<()>();
807
    /// assert_eq!(r.sender_count(), 1);
808
    ///
809
    /// let s2 = s.clone();
810
    /// assert_eq!(r.sender_count(), 2);
811
    /// # });
812
    /// ```
813
0
    pub fn sender_count(&self) -> usize {
814
0
        self.channel.sender_count.load(Ordering::SeqCst)
815
0
    }
816
817
    /// Downgrade the receiver to a weak reference.
818
0
    pub fn downgrade(&self) -> WeakReceiver<T> {
819
0
        WeakReceiver {
820
0
            channel: self.channel.clone(),
821
0
        }
822
0
    }
823
}
824
825
impl<T> fmt::Debug for Receiver<T> {
826
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827
0
        write!(f, "Receiver {{ .. }}")
828
0
    }
829
}
830
831
impl<T> Clone for Receiver<T> {
832
0
    fn clone(&self) -> Receiver<T> {
833
0
        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
834
835
        // Make sure the count never overflows, even if lots of receiver clones are leaked.
836
0
        if count > usize::MAX / 2 {
837
0
            abort();
838
0
        }
839
840
0
        Receiver {
841
0
            channel: self.channel.clone(),
842
0
            listener: None,
843
0
            _pin: PhantomPinned,
844
0
        }
845
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification> as core::clone::Clone>::clone
Unexecuted instantiation: <async_channel::Receiver<_> as core::clone::Clone>::clone
846
}
847
848
impl<T> Stream for Receiver<T> {
849
    type Item = T;
850
851
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
852
        loop {
853
            // If this stream is listening for events, first wait for a notification.
854
            {
855
0
                let this = self.as_mut().project();
856
0
                if let Some(listener) = this.listener.as_mut() {
857
0
                    ready!(Pin::new(listener).poll(cx));
858
0
                    *this.listener = None;
859
0
                }
860
            }
861
862
            loop {
863
                // Attempt to receive a message.
864
0
                match self.try_recv() {
865
0
                    Ok(msg) => {
866
                        // The stream is not blocked on an event - drop the listener.
867
0
                        let this = self.as_mut().project();
868
0
                        *this.listener = None;
869
0
                        return Poll::Ready(Some(msg));
870
                    }
871
                    Err(TryRecvError::Closed) => {
872
                        // The stream is not blocked on an event - drop the listener.
873
0
                        let this = self.as_mut().project();
874
0
                        *this.listener = None;
875
0
                        return Poll::Ready(None);
876
                    }
877
0
                    Err(TryRecvError::Empty) => {}
878
                }
879
880
                // Receiving failed - now start listening for notifications or wait for one.
881
0
                let this = self.as_mut().project();
882
0
                if this.listener.is_some() {
883
                    // Go back to the outer loop to wait for a notification.
884
0
                    break;
885
0
                } else {
886
0
                    *this.listener = Some(this.channel.stream_ops.listen());
887
0
                }
888
            }
889
        }
890
0
    }
891
}
892
893
impl<T> futures_core::stream::FusedStream for Receiver<T> {
894
0
    fn is_terminated(&self) -> bool {
895
0
        self.channel.queue.is_closed() && self.channel.queue.is_empty()
896
0
    }
897
}
898
899
/// A [`Sender`] that prevents the channel from not being closed.
900
///
901
/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
902
/// to be upgraded into a [`Sender`] through the `upgrade` method.
903
pub struct WeakSender<T> {
904
    channel: Arc<Channel<T>>,
905
}
906
907
impl<T> WeakSender<T> {
908
    /// Upgrade the [`WeakSender`] into a [`Sender`].
909
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
910
0
        if self.channel.queue.is_closed() {
911
0
            None
912
        } else {
913
0
            match self.channel.sender_count.fetch_update(
914
0
                Ordering::Relaxed,
915
0
                Ordering::Relaxed,
916
0
                |count| if count == 0 { None } else { Some(count + 1) },
917
            ) {
918
0
                Err(_) => None,
919
0
                Ok(new_value) if new_value > usize::MAX / 2 => {
920
                    // Make sure the count never overflows, even if lots of sender clones are leaked.
921
0
                    abort();
922
                }
923
0
                Ok(_) => Some(Sender {
924
0
                    channel: self.channel.clone(),
925
0
                }),
926
            }
927
        }
928
0
    }
929
}
930
931
impl<T> Clone for WeakSender<T> {
932
0
    fn clone(&self) -> Self {
933
0
        WeakSender {
934
0
            channel: self.channel.clone(),
935
0
        }
936
0
    }
937
}
938
939
impl<T> fmt::Debug for WeakSender<T> {
940
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941
0
        write!(f, "WeakSender {{ .. }}")
942
0
    }
943
}
944
945
/// A [`Receiver`] that prevents the channel from not being closed.
946
///
947
/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
948
/// to be upgraded into a [`Receiver`] through the `upgrade` method.
949
pub struct WeakReceiver<T> {
950
    channel: Arc<Channel<T>>,
951
}
952
953
impl<T> WeakReceiver<T> {
954
    /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
955
0
    pub fn upgrade(&self) -> Option<Receiver<T>> {
956
0
        if self.channel.queue.is_closed() {
957
0
            None
958
        } else {
959
0
            match self.channel.receiver_count.fetch_update(
960
0
                Ordering::Relaxed,
961
0
                Ordering::Relaxed,
962
0
                |count| if count == 0 { None } else { Some(count + 1) },
963
            ) {
964
0
                Err(_) => None,
965
0
                Ok(new_value) if new_value > usize::MAX / 2 => {
966
                    // Make sure the count never overflows, even if lots of receiver clones are leaked.
967
0
                    abort();
968
                }
969
0
                Ok(_) => Some(Receiver {
970
0
                    channel: self.channel.clone(),
971
0
                    listener: None,
972
0
                    _pin: PhantomPinned,
973
0
                }),
974
            }
975
        }
976
0
    }
977
}
978
979
impl<T> Clone for WeakReceiver<T> {
980
0
    fn clone(&self) -> Self {
981
0
        WeakReceiver {
982
0
            channel: self.channel.clone(),
983
0
        }
984
0
    }
985
}
986
987
impl<T> fmt::Debug for WeakReceiver<T> {
988
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
989
0
        write!(f, "WeakReceiver {{ .. }}")
990
0
    }
991
}
992
993
/// An error returned from [`Sender::send()`].
994
///
995
/// Received because the channel is closed.
996
#[derive(PartialEq, Eq, Clone, Copy)]
997
pub struct SendError<T>(pub T);
998
999
impl<T> SendError<T> {
1000
    /// Unwraps the message that couldn't be sent.
1001
0
    pub fn into_inner(self) -> T {
1002
0
        self.0
1003
0
    }
1004
}
1005
1006
#[cfg(feature = "std")]
1007
impl<T> std::error::Error for SendError<T> {}
1008
1009
impl<T> fmt::Debug for SendError<T> {
1010
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1011
0
        write!(f, "SendError(..)")
1012
0
    }
1013
}
1014
1015
impl<T> fmt::Display for SendError<T> {
1016
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1017
0
        write!(f, "sending into a closed channel")
1018
0
    }
1019
}
1020
1021
/// An error returned from [`Sender::try_send()`].
1022
#[derive(PartialEq, Eq, Clone, Copy)]
1023
pub enum TrySendError<T> {
1024
    /// The channel is full but not closed.
1025
    Full(T),
1026
1027
    /// The channel is closed.
1028
    Closed(T),
1029
}
1030
1031
impl<T> TrySendError<T> {
1032
    /// Unwraps the message that couldn't be sent.
1033
0
    pub fn into_inner(self) -> T {
1034
0
        match self {
1035
0
            TrySendError::Full(t) => t,
1036
0
            TrySendError::Closed(t) => t,
1037
        }
1038
0
    }
1039
1040
    /// Returns `true` if the channel is full but not closed.
1041
0
    pub fn is_full(&self) -> bool {
1042
0
        match self {
1043
0
            TrySendError::Full(_) => true,
1044
0
            TrySendError::Closed(_) => false,
1045
        }
1046
0
    }
1047
1048
    /// Returns `true` if the channel is closed.
1049
0
    pub fn is_closed(&self) -> bool {
1050
0
        match self {
1051
0
            TrySendError::Full(_) => false,
1052
0
            TrySendError::Closed(_) => true,
1053
        }
1054
0
    }
1055
}
1056
1057
#[cfg(feature = "std")]
1058
impl<T> std::error::Error for TrySendError<T> {}
1059
1060
impl<T> fmt::Debug for TrySendError<T> {
1061
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062
0
        match *self {
1063
0
            TrySendError::Full(..) => write!(f, "Full(..)"),
1064
0
            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1065
        }
1066
0
    }
1067
}
1068
1069
impl<T> fmt::Display for TrySendError<T> {
1070
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1071
0
        match *self {
1072
0
            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1073
0
            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1074
        }
1075
0
    }
1076
}
1077
1078
/// An error returned from [`Receiver::recv()`].
1079
///
1080
/// Received because the channel is empty and closed.
1081
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1082
pub struct RecvError;
1083
1084
#[cfg(feature = "std")]
1085
impl std::error::Error for RecvError {}
1086
1087
impl fmt::Display for RecvError {
1088
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089
0
        write!(f, "receiving from an empty and closed channel")
1090
0
    }
1091
}
1092
1093
/// An error returned from [`Receiver::try_recv()`].
1094
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1095
pub enum TryRecvError {
1096
    /// The channel is empty but not closed.
1097
    Empty,
1098
1099
    /// The channel is empty and closed.
1100
    Closed,
1101
}
1102
1103
impl TryRecvError {
1104
    /// Returns `true` if the channel is empty but not closed.
1105
0
    pub fn is_empty(&self) -> bool {
1106
0
        match self {
1107
0
            TryRecvError::Empty => true,
1108
0
            TryRecvError::Closed => false,
1109
        }
1110
0
    }
1111
1112
    /// Returns `true` if the channel is empty and closed.
1113
0
    pub fn is_closed(&self) -> bool {
1114
0
        match self {
1115
0
            TryRecvError::Empty => false,
1116
0
            TryRecvError::Closed => true,
1117
        }
1118
0
    }
1119
}
1120
1121
#[cfg(feature = "std")]
1122
impl std::error::Error for TryRecvError {}
1123
1124
impl fmt::Display for TryRecvError {
1125
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126
0
        match *self {
1127
0
            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1128
0
            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1129
        }
1130
0
    }
1131
}
1132
1133
easy_wrapper! {
1134
    /// A future returned by [`Sender::send()`].
1135
    #[derive(Debug)]
1136
    #[must_use = "futures do nothing unless you `.await` or poll them"]
1137
    pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1138
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1139
    pub(crate) wait();
1140
}
1141
1142
pin_project! {
1143
    #[derive(Debug)]
1144
    #[project(!Unpin)]
1145
    struct SendInner<'a, T> {
1146
        // Reference to the original sender.
1147
        sender: &'a Sender<T>,
1148
1149
        // The message to send.
1150
        msg: Option<T>,
1151
1152
        // Listener waiting on the channel.
1153
        listener: Option<EventListener>,
1154
1155
        // Keeping this type `!Unpin` enables future optimizations.
1156
        #[pin]
1157
        _pin: PhantomPinned
1158
    }
1159
}
1160
1161
impl<'a, T> EventListenerFuture for SendInner<'a, T> {
1162
    type Output = Result<(), SendError<T>>;
1163
1164
    /// Run this future with the given `Strategy`.
1165
0
    fn poll_with_strategy<'x, S: Strategy<'x>>(
1166
0
        self: Pin<&mut Self>,
1167
0
        strategy: &mut S,
1168
0
        context: &mut S::Context,
1169
0
    ) -> Poll<Result<(), SendError<T>>> {
1170
0
        let this = self.project();
1171
1172
        loop {
1173
0
            let msg = this.msg.take().unwrap();
1174
            // Attempt to send a message.
1175
0
            match this.sender.try_send(msg) {
1176
0
                Ok(()) => return Poll::Ready(Ok(())),
1177
0
                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1178
0
                Err(TrySendError::Full(m)) => *this.msg = Some(m),
1179
            }
1180
1181
            // Sending failed - now start listening for notifications or wait for one.
1182
0
            if this.listener.is_some() {
1183
                // Poll using the given strategy
1184
0
                ready!(S::poll(strategy, &mut *this.listener, context));
1185
0
            } else {
1186
0
                *this.listener = Some(this.sender.channel.send_ops.listen());
1187
0
            }
1188
        }
1189
0
    }
Unexecuted instantiation: <async_channel::SendInner<surrealdb_types::notification::Notification> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
Unexecuted instantiation: <async_channel::SendInner<_> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<_>
1190
}
1191
1192
easy_wrapper! {
1193
    /// A future returned by [`Receiver::recv()`].
1194
    #[derive(Debug)]
1195
    #[must_use = "futures do nothing unless you `.await` or poll them"]
1196
    pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1197
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1198
    pub(crate) wait();
1199
}
1200
1201
pin_project! {
1202
    #[derive(Debug)]
1203
    #[project(!Unpin)]
1204
    struct RecvInner<'a, T> {
1205
        // Reference to the receiver.
1206
        receiver: &'a Receiver<T>,
1207
1208
        // Listener waiting on the channel.
1209
        listener: Option<EventListener>,
1210
1211
        // Keeping this type `!Unpin` enables future optimizations.
1212
        #[pin]
1213
        _pin: PhantomPinned
1214
    }
1215
}
1216
1217
impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
1218
    type Output = Result<T, RecvError>;
1219
1220
    /// Run this future with the given `Strategy`.
1221
0
    fn poll_with_strategy<'x, S: Strategy<'x>>(
1222
0
        self: Pin<&mut Self>,
1223
0
        strategy: &mut S,
1224
0
        cx: &mut S::Context,
1225
0
    ) -> Poll<Result<T, RecvError>> {
1226
0
        let this = self.project();
1227
1228
        loop {
1229
            // Attempt to receive a message.
1230
0
            match this.receiver.try_recv() {
1231
0
                Ok(msg) => return Poll::Ready(Ok(msg)),
1232
0
                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1233
0
                Err(TryRecvError::Empty) => {}
1234
            }
1235
1236
            // Receiving failed - now start listening for notifications or wait for one.
1237
0
            if this.listener.is_some() {
1238
                // Poll using the given strategy
1239
0
                ready!(S::poll(strategy, &mut *this.listener, cx));
1240
0
            } else {
1241
0
                *this.listener = Some(this.receiver.channel.recv_ops.listen());
1242
0
            }
1243
        }
1244
0
    }
Unexecuted instantiation: <async_channel::RecvInner<surrealdb_types::notification::Notification> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
Unexecuted instantiation: <async_channel::RecvInner<_> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<_>
1245
}
1246
1247
#[cfg(feature = "std")]
1248
use std::process::abort;
1249
1250
#[cfg(not(feature = "std"))]
1251
fn abort() -> ! {
1252
    struct PanicOnDrop;
1253
1254
    impl Drop for PanicOnDrop {
1255
        fn drop(&mut self) {
1256
            panic!("Panic while panicking to abort");
1257
        }
1258
    }
1259
1260
    let _bomb = PanicOnDrop;
1261
    panic!("Panic while panicking to abort")
1262
}