Coverage Report

Created: 2026-03-11 07:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-channel-2.5.0/src/lib.rs
Line
Count
Source
1
//! An async multi-producer multi-consumer channel, where each message can be received by only
2
//! one of all existing consumers.
3
//!
4
//! There are two kinds of channels:
5
//!
6
//! 1. [Bounded][`bounded()`] channel with limited capacity.
7
//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8
//!
9
//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10
//! among multiple threads.
11
//!
12
//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13
//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14
//!
15
//! The channel can also be closed manually by calling [`Sender::close()`] or
16
//! [`Receiver::close()`].
17
//!
18
//! # Examples
19
//!
20
//! ```
21
//! # futures_lite::future::block_on(async {
22
//! let (s, r) = async_channel::unbounded();
23
//!
24
//! assert_eq!(s.send("Hello").await, Ok(()));
25
//! assert_eq!(r.recv().await, Ok("Hello"));
26
//! # });
27
//! ```
28
29
#![cfg_attr(not(feature = "std"), no_std)]
30
#![forbid(unsafe_code)]
31
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32
#![doc(
33
    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34
)]
35
#![doc(
36
    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37
)]
38
39
#[cfg(not(feature = "portable-atomic"))]
40
extern crate alloc;
41
42
use core::fmt;
43
use core::future::Future;
44
use core::marker::PhantomPinned;
45
use core::pin::Pin;
46
use core::task::{Context, Poll};
47
48
#[cfg(not(feature = "portable-atomic"))]
49
use alloc::sync::Arc;
50
#[cfg(not(feature = "portable-atomic"))]
51
use core::sync::atomic::{AtomicUsize, Ordering};
52
53
#[cfg(feature = "portable-atomic")]
54
use portable_atomic::{AtomicUsize, Ordering};
55
#[cfg(feature = "portable-atomic")]
56
use portable_atomic_util::Arc;
57
58
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
59
use event_listener_strategy::{
60
    easy_wrapper,
61
    event_listener::{Event, EventListener},
62
    EventListenerFuture, Strategy,
63
};
64
use futures_core::ready;
65
use futures_core::stream::Stream;
66
use pin_project_lite::pin_project;
67
68
struct Channel<T> {
69
    /// Inner message queue.
70
    queue: ConcurrentQueue<T>,
71
72
    /// Send operations waiting while the channel is full.
73
    send_ops: Event,
74
75
    /// Receive operations waiting while the channel is empty and not closed.
76
    recv_ops: Event,
77
78
    /// Stream operations while the channel is empty and not closed.
79
    stream_ops: Event,
80
81
    /// Closed operations while the channel is not closed.
82
    closed_ops: Event,
83
84
    /// The number of currently active `Sender`s.
85
    sender_count: AtomicUsize,
86
87
    /// The number of currently active `Receivers`s.
88
    receiver_count: AtomicUsize,
89
}
90
91
impl<T> Channel<T> {
92
    /// Closes the channel and notifies all blocked operations.
93
    ///
94
    /// Returns `true` if this call has closed the channel and it was not closed already.
95
0
    fn close(&self) -> bool {
96
0
        if self.queue.close() {
97
            // Notify all send operations.
98
0
            self.send_ops.notify(usize::MAX);
99
100
            // Notify all receive and stream operations.
101
0
            self.recv_ops.notify(usize::MAX);
102
0
            self.stream_ops.notify(usize::MAX);
103
0
            self.closed_ops.notify(usize::MAX);
104
105
0
            true
106
        } else {
107
0
            false
108
        }
109
0
    }
Unexecuted instantiation: <async_channel::Channel<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::close
Unexecuted instantiation: <async_channel::Channel<surrealdb_types::notification::Notification>>::close
Unexecuted instantiation: <async_channel::Channel<_>>::close
110
}
111
112
/// Creates a bounded channel.
113
///
114
/// The created channel has space to hold at most `cap` messages at a time.
115
///
116
/// # Panics
117
///
118
/// Capacity must be a positive number. If `cap` is zero, this function will panic.
119
///
120
/// # Examples
121
///
122
/// ```
123
/// # futures_lite::future::block_on(async {
124
/// use async_channel::{bounded, TryRecvError, TrySendError};
125
///
126
/// let (s, r) = bounded(1);
127
///
128
/// assert_eq!(s.send(10).await, Ok(()));
129
/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
130
///
131
/// assert_eq!(r.recv().await, Ok(10));
132
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
133
/// # });
134
/// ```
135
0
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
136
0
    assert!(cap > 0, "capacity cannot be zero");
137
138
0
    let channel = Arc::new(Channel {
139
0
        queue: ConcurrentQueue::bounded(cap),
140
0
        send_ops: Event::new(),
141
0
        recv_ops: Event::new(),
142
0
        stream_ops: Event::new(),
143
0
        closed_ops: Event::new(),
144
0
        sender_count: AtomicUsize::new(1),
145
0
        receiver_count: AtomicUsize::new(1),
146
0
    });
147
148
0
    let s = Sender {
149
0
        channel: channel.clone(),
150
0
    };
151
0
    let r = Receiver {
152
0
        listener: None,
153
0
        channel,
154
0
        _pin: PhantomPinned,
155
0
    };
156
0
    (s, r)
157
0
}
Unexecuted instantiation: async_channel::bounded::<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>
Unexecuted instantiation: async_channel::bounded::<surrealdb_types::notification::Notification>
Unexecuted instantiation: async_channel::bounded::<_>
158
159
/// Creates an unbounded channel.
160
///
161
/// The created channel can hold an unlimited number of messages.
162
///
163
/// # Examples
164
///
165
/// ```
166
/// # futures_lite::future::block_on(async {
167
/// use async_channel::{unbounded, TryRecvError};
168
///
169
/// let (s, r) = unbounded();
170
///
171
/// assert_eq!(s.send(10).await, Ok(()));
172
/// assert_eq!(s.send(20).await, Ok(()));
173
///
174
/// assert_eq!(r.recv().await, Ok(10));
175
/// assert_eq!(r.recv().await, Ok(20));
176
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
177
/// # });
178
/// ```
179
0
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
180
0
    let channel = Arc::new(Channel {
181
0
        queue: ConcurrentQueue::unbounded(),
182
0
        send_ops: Event::new(),
183
0
        recv_ops: Event::new(),
184
0
        stream_ops: Event::new(),
185
0
        closed_ops: Event::new(),
186
0
        sender_count: AtomicUsize::new(1),
187
0
        receiver_count: AtomicUsize::new(1),
188
0
    });
189
190
0
    let s = Sender {
191
0
        channel: channel.clone(),
192
0
    };
193
0
    let r = Receiver {
194
0
        listener: None,
195
0
        channel,
196
0
        _pin: PhantomPinned,
197
0
    };
198
0
    (s, r)
199
0
}
Unexecuted instantiation: async_channel::unbounded::<surrealdb_types::notification::Notification>
Unexecuted instantiation: async_channel::unbounded::<_>
200
201
/// The sending side of a channel.
202
///
203
/// Senders can be cloned and shared among threads. When all senders associated with a channel are
204
/// dropped, the channel becomes closed.
205
///
206
/// The channel can also be closed manually by calling [`Sender::close()`].
207
pub struct Sender<T> {
208
    /// Inner channel state.
209
    channel: Arc<Channel<T>>,
210
}
211
212
impl<T> Sender<T> {
213
    /// Attempts to send a message into the channel.
214
    ///
215
    /// If the channel is full or closed, this method returns an error.
216
    ///
217
    /// # Examples
218
    ///
219
    /// ```
220
    /// use async_channel::{bounded, TrySendError};
221
    ///
222
    /// let (s, r) = bounded(1);
223
    ///
224
    /// assert_eq!(s.try_send(1), Ok(()));
225
    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
226
    ///
227
    /// drop(r);
228
    /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
229
    /// ```
230
0
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
231
0
        match self.channel.queue.push(msg) {
232
            Ok(()) => {
233
                // Notify a blocked receive operation. If the notified operation gets canceled,
234
                // it will notify another blocked receive operation.
235
0
                self.channel.recv_ops.notify_additional(1);
236
237
                // Notify all blocked streams.
238
0
                self.channel.stream_ops.notify(usize::MAX);
239
240
0
                Ok(())
241
            }
242
0
            Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
243
0
            Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
244
        }
245
0
    }
Unexecuted instantiation: <async_channel::Sender<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::try_send
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification>>::try_send
Unexecuted instantiation: <async_channel::Sender<_>>::try_send
246
247
    /// Sends a message into the channel.
248
    ///
249
    /// If the channel is full, this method waits until there is space for a message.
250
    ///
251
    /// If the channel is closed, this method returns an error.
252
    ///
253
    /// # Examples
254
    ///
255
    /// ```
256
    /// # futures_lite::future::block_on(async {
257
    /// use async_channel::{unbounded, SendError};
258
    ///
259
    /// let (s, r) = unbounded();
260
    ///
261
    /// assert_eq!(s.send(1).await, Ok(()));
262
    /// drop(r);
263
    /// assert_eq!(s.send(2).await, Err(SendError(2)));
264
    /// # });
265
    /// ```
266
0
    pub fn send(&self, msg: T) -> Send<'_, T> {
267
0
        Send::_new(SendInner {
268
0
            sender: self,
269
0
            msg: Some(msg),
270
0
            listener: None,
271
0
            _pin: PhantomPinned,
272
0
        })
273
0
    }
Unexecuted instantiation: <async_channel::Sender<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::send
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification>>::send
Unexecuted instantiation: <async_channel::Sender<_>>::send
274
275
    /// Completes when all receivers have dropped.
276
    ///
277
    /// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
278
    ///
279
    /// # Examples
280
    ///
281
    /// ```
282
    /// # futures_lite::future::block_on(async {
283
    /// use async_channel::{unbounded, SendError};
284
    ///
285
    /// let (s, r) = unbounded::<i32>();
286
    /// drop(r);
287
    /// s.closed().await;
288
    /// # });
289
    /// ```
290
0
    pub fn closed(&self) -> Closed<'_, T> {
291
0
        Closed::_new(ClosedInner {
292
0
            sender: self,
293
0
            listener: None,
294
0
            _pin: PhantomPinned,
295
0
        })
296
0
    }
297
298
    /// Sends a message into this channel using the blocking strategy.
299
    ///
300
    /// If the channel is full, this method will block until there is room.
301
    /// If the channel is closed, this method returns an error.
302
    ///
303
    /// # Blocking
304
    ///
305
    /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
306
    /// this method will block the current thread until the message is sent.
307
    ///
308
    /// This method should not be used in an asynchronous context. It is intended
309
    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
310
    /// Calling this method in an asynchronous context may result in deadlocks.
311
    ///
312
    /// # Examples
313
    ///
314
    /// ```
315
    /// use async_channel::{unbounded, SendError};
316
    ///
317
    /// let (s, r) = unbounded();
318
    ///
319
    /// assert_eq!(s.send_blocking(1), Ok(()));
320
    /// drop(r);
321
    /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
322
    /// ```
323
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
324
0
    pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
325
0
        self.send(msg).wait()
326
0
    }
327
328
    /// Forcefully push a message into this channel.
329
    ///
330
    /// If the channel is full, this method will replace an existing message in the
331
    /// channel and return it as `Ok(Some(value))`. If the channel is closed, this
332
    /// method will return an error.
333
    ///
334
    /// # Examples
335
    ///
336
    /// ```
337
    /// # futures_lite::future::block_on(async {
338
    /// use async_channel::{bounded, SendError};
339
    ///
340
    /// let (s, r) = bounded(3);
341
    ///
342
    /// assert_eq!(s.send(1).await, Ok(()));
343
    /// assert_eq!(s.send(2).await, Ok(()));
344
    /// assert_eq!(s.force_send(3), Ok(None));
345
    /// assert_eq!(s.force_send(4), Ok(Some(1)));
346
    ///
347
    /// assert_eq!(r.recv().await, Ok(2));
348
    /// assert_eq!(r.recv().await, Ok(3));
349
    /// assert_eq!(r.recv().await, Ok(4));
350
    /// # });
351
    /// ```
352
0
    pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
353
0
        match self.channel.queue.force_push(msg) {
354
0
            Ok(backlog) => {
355
                // Notify a blocked receive operation. If the notified operation gets canceled,
356
                // it will notify another blocked receive operation.
357
0
                self.channel.recv_ops.notify_additional(1);
358
359
                // Notify all blocked streams.
360
0
                self.channel.stream_ops.notify(usize::MAX);
361
362
0
                Ok(backlog)
363
            }
364
365
0
            Err(ForcePushError(reject)) => Err(SendError(reject)),
366
        }
367
0
    }
368
369
    /// Closes the channel.
370
    ///
371
    /// Returns `true` if this call has closed the channel and it was not closed already.
372
    ///
373
    /// The remaining messages can still be received.
374
    ///
375
    /// # Examples
376
    ///
377
    /// ```
378
    /// # futures_lite::future::block_on(async {
379
    /// use async_channel::{unbounded, RecvError};
380
    ///
381
    /// let (s, r) = unbounded();
382
    /// assert_eq!(s.send(1).await, Ok(()));
383
    /// assert!(s.close());
384
    ///
385
    /// assert_eq!(r.recv().await, Ok(1));
386
    /// assert_eq!(r.recv().await, Err(RecvError));
387
    /// # });
388
    /// ```
389
0
    pub fn close(&self) -> bool {
390
0
        self.channel.close()
391
0
    }
392
393
    /// Returns `true` if the channel is closed.
394
    ///
395
    /// # Examples
396
    ///
397
    /// ```
398
    /// # futures_lite::future::block_on(async {
399
    /// use async_channel::{unbounded, RecvError};
400
    ///
401
    /// let (s, r) = unbounded::<()>();
402
    /// assert!(!s.is_closed());
403
    ///
404
    /// drop(r);
405
    /// assert!(s.is_closed());
406
    /// # });
407
    /// ```
408
0
    pub fn is_closed(&self) -> bool {
409
0
        self.channel.queue.is_closed()
410
0
    }
411
412
    /// Returns `true` if the channel is empty.
413
    ///
414
    /// # Examples
415
    ///
416
    /// ```
417
    /// # futures_lite::future::block_on(async {
418
    /// use async_channel::unbounded;
419
    ///
420
    /// let (s, r) = unbounded();
421
    ///
422
    /// assert!(s.is_empty());
423
    /// s.send(1).await;
424
    /// assert!(!s.is_empty());
425
    /// # });
426
    /// ```
427
0
    pub fn is_empty(&self) -> bool {
428
0
        self.channel.queue.is_empty()
429
0
    }
430
431
    /// Returns `true` if the channel is full.
432
    ///
433
    /// Unbounded channels are never full.
434
    ///
435
    /// # Examples
436
    ///
437
    /// ```
438
    /// # futures_lite::future::block_on(async {
439
    /// use async_channel::bounded;
440
    ///
441
    /// let (s, r) = bounded(1);
442
    ///
443
    /// assert!(!s.is_full());
444
    /// s.send(1).await;
445
    /// assert!(s.is_full());
446
    /// # });
447
    /// ```
448
0
    pub fn is_full(&self) -> bool {
449
0
        self.channel.queue.is_full()
450
0
    }
451
452
    /// Returns the number of messages in the channel.
453
    ///
454
    /// # Examples
455
    ///
456
    /// ```
457
    /// # futures_lite::future::block_on(async {
458
    /// use async_channel::unbounded;
459
    ///
460
    /// let (s, r) = unbounded();
461
    /// assert_eq!(s.len(), 0);
462
    ///
463
    /// s.send(1).await;
464
    /// s.send(2).await;
465
    /// assert_eq!(s.len(), 2);
466
    /// # });
467
    /// ```
468
0
    pub fn len(&self) -> usize {
469
0
        self.channel.queue.len()
470
0
    }
471
472
    /// Returns the channel capacity if it's bounded.
473
    ///
474
    /// # Examples
475
    ///
476
    /// ```
477
    /// use async_channel::{bounded, unbounded};
478
    ///
479
    /// let (s, r) = bounded::<i32>(5);
480
    /// assert_eq!(s.capacity(), Some(5));
481
    ///
482
    /// let (s, r) = unbounded::<i32>();
483
    /// assert_eq!(s.capacity(), None);
484
    /// ```
485
0
    pub fn capacity(&self) -> Option<usize> {
486
0
        self.channel.queue.capacity()
487
0
    }
488
489
    /// Returns the number of receivers for the channel.
490
    ///
491
    /// # Examples
492
    ///
493
    /// ```
494
    /// # futures_lite::future::block_on(async {
495
    /// use async_channel::unbounded;
496
    ///
497
    /// let (s, r) = unbounded::<()>();
498
    /// assert_eq!(s.receiver_count(), 1);
499
    ///
500
    /// let r2 = r.clone();
501
    /// assert_eq!(s.receiver_count(), 2);
502
    /// # });
503
    /// ```
504
0
    pub fn receiver_count(&self) -> usize {
505
0
        self.channel.receiver_count.load(Ordering::SeqCst)
506
0
    }
507
508
    /// Returns the number of senders for the channel.
509
    ///
510
    /// # Examples
511
    ///
512
    /// ```
513
    /// # futures_lite::future::block_on(async {
514
    /// use async_channel::unbounded;
515
    ///
516
    /// let (s, r) = unbounded::<()>();
517
    /// assert_eq!(s.sender_count(), 1);
518
    ///
519
    /// let s2 = s.clone();
520
    /// assert_eq!(s.sender_count(), 2);
521
    /// # });
522
    /// ```
523
0
    pub fn sender_count(&self) -> usize {
524
0
        self.channel.sender_count.load(Ordering::SeqCst)
525
0
    }
526
527
    /// Downgrade the sender to a weak reference.
528
0
    pub fn downgrade(&self) -> WeakSender<T> {
529
0
        WeakSender {
530
0
            channel: self.channel.clone(),
531
0
        }
532
0
    }
533
534
    /// Returns whether the senders belong to the same channel.
535
    ///
536
    /// # Examples
537
    ///
538
    /// ```
539
    /// # futures_lite::future::block_on(async {
540
    /// use async_channel::unbounded;
541
    ///
542
    /// let (s, r) = unbounded::<()>();
543
    /// let s2 = s.clone();
544
    ///
545
    /// assert!(s.same_channel(&s2));
546
    /// # });
547
    /// ```
548
0
    pub fn same_channel(&self, other: &Sender<T>) -> bool {
549
0
        Arc::ptr_eq(&self.channel, &other.channel)
550
0
    }
551
}
552
553
impl<T> Drop for Sender<T> {
554
0
    fn drop(&mut self) {
555
        // Decrement the sender count and close the channel if it drops down to zero.
556
0
        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
557
0
            self.channel.close();
558
0
        }
559
0
    }
Unexecuted instantiation: <async_channel::Sender<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <async_channel::Sender<_> as core::ops::drop::Drop>::drop
560
}
561
562
impl<T> fmt::Debug for Sender<T> {
563
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564
0
        write!(f, "Sender {{ .. }}")
565
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::fmt::Debug>::fmt
Unexecuted instantiation: <async_channel::Sender<_> as core::fmt::Debug>::fmt
566
}
567
568
impl<T> Clone for Sender<T> {
569
0
    fn clone(&self) -> Sender<T> {
570
0
        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
571
572
        // Make sure the count never overflows, even if lots of sender clones are leaked.
573
0
        if count > usize::MAX / 2 {
574
0
            abort();
575
0
        }
576
577
0
        Sender {
578
0
            channel: self.channel.clone(),
579
0
        }
580
0
    }
Unexecuted instantiation: <async_channel::Sender<surrealdb_types::notification::Notification> as core::clone::Clone>::clone
Unexecuted instantiation: <async_channel::Sender<_> as core::clone::Clone>::clone
581
}
582
583
pin_project! {
584
    /// The receiving side of a channel.
585
    ///
586
    /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
587
    /// are dropped, the channel becomes closed.
588
    ///
589
    /// The channel can also be closed manually by calling [`Receiver::close()`].
590
    ///
591
    /// Receivers implement the [`Stream`] trait.
592
    pub struct Receiver<T> {
593
        // Inner channel state.
594
        channel: Arc<Channel<T>>,
595
596
        // Listens for a send or close event to unblock this stream.
597
        listener: Option<EventListener>,
598
599
        // Keeping this type `!Unpin` enables future optimizations.
600
        #[pin]
601
        _pin: PhantomPinned
602
    }
603
604
    impl<T> PinnedDrop for Receiver<T> {
605
        fn drop(this: Pin<&mut Self>) {
606
            let this = this.project();
607
608
            // Decrement the receiver count and close the channel if it drops down to zero.
609
            if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
610
                this.channel.close();
611
            }
612
        }
613
    }
614
}
615
616
impl<T> Receiver<T> {
617
    /// Attempts to receive a message from the channel.
618
    ///
619
    /// If the channel is empty, or empty and closed, this method returns an error.
620
    ///
621
    /// # Examples
622
    ///
623
    /// ```
624
    /// # futures_lite::future::block_on(async {
625
    /// use async_channel::{unbounded, TryRecvError};
626
    ///
627
    /// let (s, r) = unbounded();
628
    /// assert_eq!(s.send(1).await, Ok(()));
629
    ///
630
    /// assert_eq!(r.try_recv(), Ok(1));
631
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
632
    ///
633
    /// drop(s);
634
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
635
    /// # });
636
    /// ```
637
0
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
638
0
        match self.channel.queue.pop() {
639
0
            Ok(msg) => {
640
                // Notify a blocked send operation. If the notified operation gets canceled, it
641
                // will notify another blocked send operation.
642
0
                self.channel.send_ops.notify_additional(1);
643
644
0
                Ok(msg)
645
            }
646
0
            Err(PopError::Empty) => Err(TryRecvError::Empty),
647
0
            Err(PopError::Closed) => Err(TryRecvError::Closed),
648
        }
649
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::try_recv
Unexecuted instantiation: <async_channel::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::try_recv
Unexecuted instantiation: <async_channel::Receiver<_>>::try_recv
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::try_recv
650
651
    /// Receives a message from the channel.
652
    ///
653
    /// If the channel is empty, this method waits until there is a message.
654
    ///
655
    /// If the channel is closed, this method receives a message or returns an error if there are
656
    /// no more messages.
657
    ///
658
    /// # Examples
659
    ///
660
    /// ```
661
    /// # futures_lite::future::block_on(async {
662
    /// use async_channel::{unbounded, RecvError};
663
    ///
664
    /// let (s, r) = unbounded();
665
    ///
666
    /// assert_eq!(s.send(1).await, Ok(()));
667
    /// drop(s);
668
    ///
669
    /// assert_eq!(r.recv().await, Ok(1));
670
    /// assert_eq!(r.recv().await, Err(RecvError));
671
    /// # });
672
    /// ```
673
0
    pub fn recv(&self) -> Recv<'_, T> {
674
0
        Recv::_new(RecvInner {
675
0
            receiver: self,
676
0
            listener: None,
677
0
            _pin: PhantomPinned,
678
0
        })
679
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::recv
Unexecuted instantiation: <async_channel::Receiver<_>>::recv
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification>>::recv
680
681
    /// Receives a message from the channel using the blocking strategy.
682
    ///
683
    /// If the channel is empty, this method waits until there is a message.
684
    /// If the channel is closed, this method receives a message or returns an error if there are
685
    /// no more messages.
686
    ///
687
    /// # Blocking
688
    ///
689
    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
690
    /// this method will block the current thread until the message is received.
691
    ///
692
    /// This method should not be used in an asynchronous context. It is intended
693
    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
694
    /// Calling this method in an asynchronous context may result in deadlocks.
695
    ///
696
    /// # Examples
697
    ///
698
    /// ```
699
    /// use async_channel::{unbounded, RecvError};
700
    ///
701
    /// let (s, r) = unbounded();
702
    ///
703
    /// assert_eq!(s.send_blocking(1), Ok(()));
704
    /// drop(s);
705
    ///
706
    /// assert_eq!(r.recv_blocking(), Ok(1));
707
    /// assert_eq!(r.recv_blocking(), Err(RecvError));
708
    /// ```
709
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
710
0
    pub fn recv_blocking(&self) -> Result<T, RecvError> {
711
0
        self.recv().wait()
712
0
    }
713
714
    /// Closes the channel.
715
    ///
716
    /// Returns `true` if this call has closed the channel and it was not closed already.
717
    ///
718
    /// The remaining messages can still be received.
719
    ///
720
    /// # Examples
721
    ///
722
    /// ```
723
    /// # futures_lite::future::block_on(async {
724
    /// use async_channel::{unbounded, RecvError};
725
    ///
726
    /// let (s, r) = unbounded();
727
    /// assert_eq!(s.send(1).await, Ok(()));
728
    ///
729
    /// assert!(r.close());
730
    /// assert_eq!(r.recv().await, Ok(1));
731
    /// assert_eq!(r.recv().await, Err(RecvError));
732
    /// # });
733
    /// ```
734
0
    pub fn close(&self) -> bool {
735
0
        self.channel.close()
736
0
    }
737
738
    /// Returns `true` if the channel is closed.
739
    ///
740
    /// # Examples
741
    ///
742
    /// ```
743
    /// # futures_lite::future::block_on(async {
744
    /// use async_channel::{unbounded, RecvError};
745
    ///
746
    /// let (s, r) = unbounded::<()>();
747
    /// assert!(!r.is_closed());
748
    ///
749
    /// drop(s);
750
    /// assert!(r.is_closed());
751
    /// # });
752
    /// ```
753
0
    pub fn is_closed(&self) -> bool {
754
0
        self.channel.queue.is_closed()
755
0
    }
756
757
    /// Returns `true` if the channel is empty.
758
    ///
759
    /// # Examples
760
    ///
761
    /// ```
762
    /// # futures_lite::future::block_on(async {
763
    /// use async_channel::unbounded;
764
    ///
765
    /// let (s, r) = unbounded();
766
    ///
767
    /// assert!(s.is_empty());
768
    /// s.send(1).await;
769
    /// assert!(!s.is_empty());
770
    /// # });
771
    /// ```
772
0
    pub fn is_empty(&self) -> bool {
773
0
        self.channel.queue.is_empty()
774
0
    }
775
776
    /// Returns `true` if the channel is full.
777
    ///
778
    /// Unbounded channels are never full.
779
    ///
780
    /// # Examples
781
    ///
782
    /// ```
783
    /// # futures_lite::future::block_on(async {
784
    /// use async_channel::bounded;
785
    ///
786
    /// let (s, r) = bounded(1);
787
    ///
788
    /// assert!(!r.is_full());
789
    /// s.send(1).await;
790
    /// assert!(r.is_full());
791
    /// # });
792
    /// ```
793
0
    pub fn is_full(&self) -> bool {
794
0
        self.channel.queue.is_full()
795
0
    }
796
797
    /// Returns the number of messages in the channel.
798
    ///
799
    /// # Examples
800
    ///
801
    /// ```
802
    /// # futures_lite::future::block_on(async {
803
    /// use async_channel::unbounded;
804
    ///
805
    /// let (s, r) = unbounded();
806
    /// assert_eq!(r.len(), 0);
807
    ///
808
    /// s.send(1).await;
809
    /// s.send(2).await;
810
    /// assert_eq!(r.len(), 2);
811
    /// # });
812
    /// ```
813
0
    pub fn len(&self) -> usize {
814
0
        self.channel.queue.len()
815
0
    }
816
817
    /// Returns the channel capacity if it's bounded.
818
    ///
819
    /// # Examples
820
    ///
821
    /// ```
822
    /// use async_channel::{bounded, unbounded};
823
    ///
824
    /// let (s, r) = bounded::<i32>(5);
825
    /// assert_eq!(r.capacity(), Some(5));
826
    ///
827
    /// let (s, r) = unbounded::<i32>();
828
    /// assert_eq!(r.capacity(), None);
829
    /// ```
830
0
    pub fn capacity(&self) -> Option<usize> {
831
0
        self.channel.queue.capacity()
832
0
    }
833
834
    /// Returns the number of receivers for the channel.
835
    ///
836
    /// # Examples
837
    ///
838
    /// ```
839
    /// # futures_lite::future::block_on(async {
840
    /// use async_channel::unbounded;
841
    ///
842
    /// let (s, r) = unbounded::<()>();
843
    /// assert_eq!(r.receiver_count(), 1);
844
    ///
845
    /// let r2 = r.clone();
846
    /// assert_eq!(r.receiver_count(), 2);
847
    /// # });
848
    /// ```
849
0
    pub fn receiver_count(&self) -> usize {
850
0
        self.channel.receiver_count.load(Ordering::SeqCst)
851
0
    }
852
853
    /// Returns the number of senders for the channel.
854
    ///
855
    /// # Examples
856
    ///
857
    /// ```
858
    /// # futures_lite::future::block_on(async {
859
    /// use async_channel::unbounded;
860
    ///
861
    /// let (s, r) = unbounded::<()>();
862
    /// assert_eq!(r.sender_count(), 1);
863
    ///
864
    /// let s2 = s.clone();
865
    /// assert_eq!(r.sender_count(), 2);
866
    /// # });
867
    /// ```
868
0
    pub fn sender_count(&self) -> usize {
869
0
        self.channel.sender_count.load(Ordering::SeqCst)
870
0
    }
871
872
    /// Downgrade the receiver to a weak reference.
873
0
    pub fn downgrade(&self) -> WeakReceiver<T> {
874
0
        WeakReceiver {
875
0
            channel: self.channel.clone(),
876
0
        }
877
0
    }
878
879
    /// Returns whether the receivers belong to the same channel.
880
    ///
881
    /// # Examples
882
    ///
883
    /// ```
884
    /// # futures_lite::future::block_on(async {
885
    /// use async_channel::unbounded;
886
    ///
887
    /// let (s, r) = unbounded::<()>();
888
    /// let r2 = r.clone();
889
    ///
890
    /// assert!(r.same_channel(&r2));
891
    /// # });
892
    /// ```
893
0
    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
894
0
        Arc::ptr_eq(&self.channel, &other.channel)
895
0
    }
896
}
897
898
impl<T> fmt::Debug for Receiver<T> {
899
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
900
0
        write!(f, "Receiver {{ .. }}")
901
0
    }
902
}
903
904
impl<T> Clone for Receiver<T> {
905
0
    fn clone(&self) -> Receiver<T> {
906
0
        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
907
908
        // Make sure the count never overflows, even if lots of receiver clones are leaked.
909
0
        if count > usize::MAX / 2 {
910
0
            abort();
911
0
        }
912
913
0
        Receiver {
914
0
            channel: self.channel.clone(),
915
0
            listener: None,
916
0
            _pin: PhantomPinned,
917
0
        }
918
0
    }
Unexecuted instantiation: <async_channel::Receiver<surrealdb_types::notification::Notification> as core::clone::Clone>::clone
Unexecuted instantiation: <async_channel::Receiver<_> as core::clone::Clone>::clone
919
}
920
921
impl<T> Stream for Receiver<T> {
922
    type Item = T;
923
924
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
925
        loop {
926
            // If this stream is listening for events, first wait for a notification.
927
            {
928
0
                let this = self.as_mut().project();
929
0
                if let Some(listener) = this.listener.as_mut() {
930
0
                    ready!(Pin::new(listener).poll(cx));
931
0
                    *this.listener = None;
932
0
                }
933
            }
934
935
            loop {
936
                // Attempt to receive a message.
937
0
                match self.try_recv() {
938
0
                    Ok(msg) => {
939
                        // The stream is not blocked on an event - drop the listener.
940
0
                        let this = self.as_mut().project();
941
0
                        *this.listener = None;
942
0
                        return Poll::Ready(Some(msg));
943
                    }
944
                    Err(TryRecvError::Closed) => {
945
                        // The stream is not blocked on an event - drop the listener.
946
0
                        let this = self.as_mut().project();
947
0
                        *this.listener = None;
948
0
                        return Poll::Ready(None);
949
                    }
950
0
                    Err(TryRecvError::Empty) => {}
951
                }
952
953
                // Receiving failed - now start listening for notifications or wait for one.
954
0
                let this = self.as_mut().project();
955
0
                if this.listener.is_some() {
956
                    // Go back to the outer loop to wait for a notification.
957
0
                    break;
958
0
                } else {
959
0
                    *this.listener = Some(this.channel.stream_ops.listen());
960
0
                }
961
            }
962
        }
963
0
    }
Unexecuted instantiation: <async_channel::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_channel::Receiver<_> as futures_core::stream::Stream>::poll_next
964
}
965
966
impl<T> futures_core::stream::FusedStream for Receiver<T> {
967
0
    fn is_terminated(&self) -> bool {
968
0
        self.channel.queue.is_closed() && self.channel.queue.is_empty()
969
0
    }
970
}
971
972
/// A [`Sender`] that does not prevent the channel from being closed.
973
///
974
/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
975
/// to be upgraded into a [`Sender`] through the `upgrade` method.
976
pub struct WeakSender<T> {
977
    channel: Arc<Channel<T>>,
978
}
979
980
impl<T> WeakSender<T> {
981
    /// Upgrade the [`WeakSender`] into a [`Sender`].
982
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
983
0
        if self.channel.queue.is_closed() {
984
0
            None
985
        } else {
986
0
            match self.channel.sender_count.fetch_update(
987
0
                Ordering::Relaxed,
988
0
                Ordering::Relaxed,
989
0
                |count| if count == 0 { None } else { Some(count + 1) },
990
            ) {
991
0
                Err(_) => None,
992
0
                Ok(new_value) if new_value > usize::MAX / 2 => {
993
                    // Make sure the count never overflows, even if lots of sender clones are leaked.
994
0
                    abort();
995
                }
996
0
                Ok(_) => Some(Sender {
997
0
                    channel: self.channel.clone(),
998
0
                }),
999
            }
1000
        }
1001
0
    }
1002
}
1003
1004
impl<T> Clone for WeakSender<T> {
1005
0
    fn clone(&self) -> Self {
1006
0
        WeakSender {
1007
0
            channel: self.channel.clone(),
1008
0
        }
1009
0
    }
1010
}
1011
1012
impl<T> fmt::Debug for WeakSender<T> {
1013
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1014
0
        write!(f, "WeakSender {{ .. }}")
1015
0
    }
1016
}
1017
1018
/// A [`Receiver`] that does not prevent the channel from being closed.
1019
///
1020
/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
1021
/// to be upgraded into a [`Receiver`] through the `upgrade` method.
1022
pub struct WeakReceiver<T> {
1023
    channel: Arc<Channel<T>>,
1024
}
1025
1026
impl<T> WeakReceiver<T> {
1027
    /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
1028
0
    pub fn upgrade(&self) -> Option<Receiver<T>> {
1029
0
        if self.channel.queue.is_closed() {
1030
0
            None
1031
        } else {
1032
0
            match self.channel.receiver_count.fetch_update(
1033
0
                Ordering::Relaxed,
1034
0
                Ordering::Relaxed,
1035
0
                |count| if count == 0 { None } else { Some(count + 1) },
1036
            ) {
1037
0
                Err(_) => None,
1038
0
                Ok(new_value) if new_value > usize::MAX / 2 => {
1039
                    // Make sure the count never overflows, even if lots of receiver clones are leaked.
1040
0
                    abort();
1041
                }
1042
0
                Ok(_) => Some(Receiver {
1043
0
                    channel: self.channel.clone(),
1044
0
                    listener: None,
1045
0
                    _pin: PhantomPinned,
1046
0
                }),
1047
            }
1048
        }
1049
0
    }
1050
}
1051
1052
impl<T> Clone for WeakReceiver<T> {
1053
0
    fn clone(&self) -> Self {
1054
0
        WeakReceiver {
1055
0
            channel: self.channel.clone(),
1056
0
        }
1057
0
    }
1058
}
1059
1060
impl<T> fmt::Debug for WeakReceiver<T> {
1061
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062
0
        write!(f, "WeakReceiver {{ .. }}")
1063
0
    }
1064
}
1065
1066
/// An error returned from [`Sender::send()`].
1067
///
1068
/// Received because the channel is closed.
1069
#[derive(PartialEq, Eq, Clone, Copy)]
1070
pub struct SendError<T>(pub T);
1071
1072
impl<T> SendError<T> {
1073
    /// Unwraps the message that couldn't be sent.
1074
0
    pub fn into_inner(self) -> T {
1075
0
        self.0
1076
0
    }
1077
}
1078
1079
#[cfg(feature = "std")]
1080
impl<T> std::error::Error for SendError<T> {}
1081
1082
impl<T> fmt::Debug for SendError<T> {
1083
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1084
0
        write!(f, "SendError(..)")
1085
0
    }
1086
}
1087
1088
impl<T> fmt::Display for SendError<T> {
1089
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090
0
        write!(f, "sending into a closed channel")
1091
0
    }
1092
}
1093
1094
/// An error returned from [`Sender::try_send()`].
1095
#[derive(PartialEq, Eq, Clone, Copy)]
1096
pub enum TrySendError<T> {
1097
    /// The channel is full but not closed.
1098
    Full(T),
1099
1100
    /// The channel is closed.
1101
    Closed(T),
1102
}
1103
1104
impl<T> TrySendError<T> {
1105
    /// Unwraps the message that couldn't be sent.
1106
0
    pub fn into_inner(self) -> T {
1107
0
        match self {
1108
0
            TrySendError::Full(t) => t,
1109
0
            TrySendError::Closed(t) => t,
1110
        }
1111
0
    }
1112
1113
    /// Returns `true` if the channel is full but not closed.
1114
0
    pub fn is_full(&self) -> bool {
1115
0
        match self {
1116
0
            TrySendError::Full(_) => true,
1117
0
            TrySendError::Closed(_) => false,
1118
        }
1119
0
    }
1120
1121
    /// Returns `true` if the channel is closed.
1122
0
    pub fn is_closed(&self) -> bool {
1123
0
        match self {
1124
0
            TrySendError::Full(_) => false,
1125
0
            TrySendError::Closed(_) => true,
1126
        }
1127
0
    }
1128
}
1129
1130
#[cfg(feature = "std")]
1131
impl<T> std::error::Error for TrySendError<T> {}
1132
1133
impl<T> fmt::Debug for TrySendError<T> {
1134
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135
0
        match *self {
1136
0
            TrySendError::Full(..) => write!(f, "Full(..)"),
1137
0
            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1138
        }
1139
0
    }
1140
}
1141
1142
impl<T> fmt::Display for TrySendError<T> {
1143
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1144
0
        match *self {
1145
0
            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1146
0
            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1147
        }
1148
0
    }
1149
}
1150
1151
/// An error returned from [`Receiver::recv()`].
1152
///
1153
/// Received because the channel is empty and closed.
1154
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1155
pub struct RecvError;
1156
1157
#[cfg(feature = "std")]
1158
impl std::error::Error for RecvError {}
1159
1160
impl fmt::Display for RecvError {
1161
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1162
0
        write!(f, "receiving from an empty and closed channel")
1163
0
    }
1164
}
1165
1166
/// An error returned from [`Receiver::try_recv()`].
1167
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1168
pub enum TryRecvError {
1169
    /// The channel is empty but not closed.
1170
    Empty,
1171
1172
    /// The channel is empty and closed.
1173
    Closed,
1174
}
1175
1176
impl TryRecvError {
1177
    /// Returns `true` if the channel is empty but not closed.
1178
0
    pub fn is_empty(&self) -> bool {
1179
0
        match self {
1180
0
            TryRecvError::Empty => true,
1181
0
            TryRecvError::Closed => false,
1182
        }
1183
0
    }
1184
1185
    /// Returns `true` if the channel is empty and closed.
1186
0
    pub fn is_closed(&self) -> bool {
1187
0
        match self {
1188
0
            TryRecvError::Empty => false,
1189
0
            TryRecvError::Closed => true,
1190
        }
1191
0
    }
1192
}
1193
1194
#[cfg(feature = "std")]
1195
impl std::error::Error for TryRecvError {}
1196
1197
impl fmt::Display for TryRecvError {
1198
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1199
0
        match *self {
1200
0
            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1201
0
            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1202
        }
1203
0
    }
1204
}
1205
1206
easy_wrapper! {
1207
    /// A future returned by [`Sender::send()`].
1208
    #[derive(Debug)]
1209
    #[must_use = "futures do nothing unless you `.await` or poll them"]
1210
    pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1211
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1212
    pub(crate) wait();
1213
}
1214
1215
pin_project! {
1216
    #[derive(Debug)]
1217
    #[project(!Unpin)]
1218
    struct SendInner<'a, T> {
1219
        // Reference to the original sender.
1220
        sender: &'a Sender<T>,
1221
1222
        // The message to send.
1223
        msg: Option<T>,
1224
1225
        // Listener waiting on the channel.
1226
        listener: Option<EventListener>,
1227
1228
        // Keeping this type `!Unpin` enables future optimizations.
1229
        #[pin]
1230
        _pin: PhantomPinned
1231
    }
1232
}
1233
1234
impl<T> EventListenerFuture for SendInner<'_, T> {
1235
    type Output = Result<(), SendError<T>>;
1236
1237
    /// Run this future with the given `Strategy`.
1238
0
    fn poll_with_strategy<'x, S: Strategy<'x>>(
1239
0
        self: Pin<&mut Self>,
1240
0
        strategy: &mut S,
1241
0
        context: &mut S::Context,
1242
0
    ) -> Poll<Result<(), SendError<T>>> {
1243
0
        let this = self.project();
1244
1245
        loop {
1246
0
            let msg = this.msg.take().unwrap();
1247
            // Attempt to send a message.
1248
0
            match this.sender.try_send(msg) {
1249
0
                Ok(()) => return Poll::Ready(Ok(())),
1250
0
                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1251
0
                Err(TrySendError::Full(m)) => *this.msg = Some(m),
1252
            }
1253
1254
            // Sending failed - now start listening for notifications or wait for one.
1255
0
            if this.listener.is_some() {
1256
                // Poll using the given strategy
1257
0
                ready!(S::poll(strategy, &mut *this.listener, context));
1258
0
            } else {
1259
0
                *this.listener = Some(this.sender.channel.send_ops.listen());
1260
0
            }
1261
        }
1262
0
    }
Unexecuted instantiation: <async_channel::SendInner<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
Unexecuted instantiation: <async_channel::SendInner<surrealdb_types::notification::Notification> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
Unexecuted instantiation: <async_channel::SendInner<_> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<_>
1263
}
1264
1265
easy_wrapper! {
1266
    /// A future returned by [`Receiver::recv()`].
1267
    #[derive(Debug)]
1268
    #[must_use = "futures do nothing unless you `.await` or poll them"]
1269
    pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1270
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1271
    pub(crate) wait();
1272
}
1273
1274
pin_project! {
1275
    #[derive(Debug)]
1276
    #[project(!Unpin)]
1277
    struct RecvInner<'a, T> {
1278
        // Reference to the receiver.
1279
        receiver: &'a Receiver<T>,
1280
1281
        // Listener waiting on the channel.
1282
        listener: Option<EventListener>,
1283
1284
        // Keeping this type `!Unpin` enables future optimizations.
1285
        #[pin]
1286
        _pin: PhantomPinned
1287
    }
1288
}
1289
1290
impl<T> EventListenerFuture for RecvInner<'_, T> {
1291
    type Output = Result<T, RecvError>;
1292
1293
    /// Run this future with the given `Strategy`.
1294
0
    fn poll_with_strategy<'x, S: Strategy<'x>>(
1295
0
        self: Pin<&mut Self>,
1296
0
        strategy: &mut S,
1297
0
        cx: &mut S::Context,
1298
0
    ) -> Poll<Result<T, RecvError>> {
1299
0
        let this = self.project();
1300
1301
        loop {
1302
            // Attempt to receive a message.
1303
0
            match this.receiver.try_recv() {
1304
0
                Ok(msg) => return Poll::Ready(Ok(msg)),
1305
0
                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1306
0
                Err(TryRecvError::Empty) => {}
1307
            }
1308
1309
            // Receiving failed - now start listening for notifications or wait for one.
1310
0
            if this.listener.is_some() {
1311
                // Poll using the given strategy
1312
0
                ready!(S::poll(strategy, &mut *this.listener, cx));
1313
0
            } else {
1314
0
                *this.listener = Some(this.receiver.channel.recv_ops.listen());
1315
0
            }
1316
        }
1317
0
    }
Unexecuted instantiation: <async_channel::RecvInner<surrealdb_types::notification::Notification> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
Unexecuted instantiation: <async_channel::RecvInner<_> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<_>
Unexecuted instantiation: <async_channel::RecvInner<surrealdb_types::notification::Notification> as event_listener_strategy::EventListenerFuture>::poll_with_strategy::<event_listener_strategy::NonBlocking>
1318
}
1319
1320
easy_wrapper! {
1321
    /// A future returned by [`Sender::closed()`].
1322
    #[derive(Debug)]
1323
    #[must_use = "futures do nothing unless you `.await` or poll them"]
1324
    pub struct Closed<'a, T>(ClosedInner<'a, T> => ());
1325
    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1326
    pub(crate) wait();
1327
}
1328
1329
pin_project! {
1330
    #[derive(Debug)]
1331
    #[project(!Unpin)]
1332
    struct ClosedInner<'a, T> {
1333
        // Reference to the sender.
1334
        sender: &'a Sender<T>,
1335
1336
        // Listener waiting on the channel.
1337
        listener: Option<EventListener>,
1338
1339
        // Keeping this type `!Unpin` enables future optimizations.
1340
        #[pin]
1341
        _pin: PhantomPinned
1342
    }
1343
}
1344
1345
impl<'a, T> EventListenerFuture for ClosedInner<'a, T> {
1346
    type Output = ();
1347
1348
    /// Run this future with the given `Strategy`.
1349
0
    fn poll_with_strategy<'x, S: Strategy<'x>>(
1350
0
        self: Pin<&mut Self>,
1351
0
        strategy: &mut S,
1352
0
        cx: &mut S::Context,
1353
0
    ) -> Poll<()> {
1354
0
        let this = self.project();
1355
1356
        loop {
1357
            // Check if the channel is closed.
1358
0
            if this.sender.is_closed() {
1359
0
                return Poll::Ready(());
1360
0
            }
1361
1362
            // Not closed - now start listening for notifications or wait for one.
1363
0
            if this.listener.is_some() {
1364
                // Poll using the given strategy
1365
0
                ready!(S::poll(strategy, &mut *this.listener, cx));
1366
0
            } else {
1367
0
                *this.listener = Some(this.sender.channel.closed_ops.listen());
1368
0
            }
1369
        }
1370
0
    }
1371
}
1372
1373
#[cfg(feature = "std")]
1374
use std::process::abort;
1375
1376
#[cfg(not(feature = "std"))]
1377
fn abort() -> ! {
1378
    struct PanicOnDrop;
1379
1380
    impl Drop for PanicOnDrop {
1381
        fn drop(&mut self) {
1382
            panic!("Panic while panicking to abort");
1383
        }
1384
    }
1385
1386
    let _bomb = PanicOnDrop;
1387
    panic!("Panic while panicking to abort")
1388
}