Coverage Report

Created: 2026-03-31 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/crossbeam-channel-0.5.15/src/channel.rs
Line
Count
Source
1
//! The channel interface.
2
3
use std::fmt;
4
use std::iter::FusedIterator;
5
use std::mem;
6
use std::panic::{RefUnwindSafe, UnwindSafe};
7
use std::sync::Arc;
8
use std::time::{Duration, Instant};
9
10
use crate::context::Context;
11
use crate::counter;
12
use crate::err::{
13
    RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14
};
15
use crate::flavors;
16
use crate::select::{Operation, SelectHandle, Token};
17
18
/// Creates a channel of unbounded capacity.
19
///
20
/// This channel has a growable buffer that can hold any number of messages at a time.
21
///
22
/// # Examples
23
///
24
/// ```
25
/// use std::thread;
26
/// use crossbeam_channel::unbounded;
27
///
28
/// let (s, r) = unbounded();
29
///
30
/// // Computes the n-th Fibonacci number.
31
/// fn fib(n: i32) -> i32 {
32
///     if n <= 1 {
33
///         n
34
///     } else {
35
///         fib(n - 1) + fib(n - 2)
36
///     }
37
/// }
38
///
39
/// // Spawn an asynchronous computation.
40
/// thread::spawn(move || s.send(fib(20)).unwrap());
41
///
42
/// // Print the result of the computation.
43
/// println!("{}", r.recv().unwrap());
44
/// ```
45
0
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46
0
    let (s, r) = counter::new(flavors::list::Channel::new());
47
0
    let s = Sender {
48
0
        flavor: SenderFlavor::List(s),
49
0
    };
50
0
    let r = Receiver {
51
0
        flavor: ReceiverFlavor::List(r),
52
0
    };
53
0
    (s, r)
54
0
}
55
56
/// Creates a channel of bounded capacity.
57
///
58
/// This channel has a buffer that can hold at most `cap` messages at a time.
59
///
60
/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61
/// receive operations must appear at the same time in order to pair up and pass the message over.
62
///
63
/// # Examples
64
///
65
/// A channel of capacity 1:
66
///
67
/// ```
68
/// use std::thread;
69
/// use std::time::Duration;
70
/// use crossbeam_channel::bounded;
71
///
72
/// let (s, r) = bounded(1);
73
///
74
/// // This call returns immediately because there is enough space in the channel.
75
/// s.send(1).unwrap();
76
///
77
/// thread::spawn(move || {
78
///     // This call blocks the current thread because the channel is full.
79
///     // It will be able to complete only after the first message is received.
80
///     s.send(2).unwrap();
81
/// });
82
///
83
/// thread::sleep(Duration::from_secs(1));
84
/// assert_eq!(r.recv(), Ok(1));
85
/// assert_eq!(r.recv(), Ok(2));
86
/// ```
87
///
88
/// A zero-capacity channel:
89
///
90
/// ```
91
/// use std::thread;
92
/// use std::time::Duration;
93
/// use crossbeam_channel::bounded;
94
///
95
/// let (s, r) = bounded(0);
96
///
97
/// thread::spawn(move || {
98
///     // This call blocks the current thread until a receive operation appears
99
///     // on the other side of the channel.
100
///     s.send(1).unwrap();
101
/// });
102
///
103
/// thread::sleep(Duration::from_secs(1));
104
/// assert_eq!(r.recv(), Ok(1));
105
/// ```
106
0
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107
0
    if cap == 0 {
108
0
        let (s, r) = counter::new(flavors::zero::Channel::new());
109
0
        let s = Sender {
110
0
            flavor: SenderFlavor::Zero(s),
111
0
        };
112
0
        let r = Receiver {
113
0
            flavor: ReceiverFlavor::Zero(r),
114
0
        };
115
0
        (s, r)
116
    } else {
117
0
        let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118
0
        let s = Sender {
119
0
            flavor: SenderFlavor::Array(s),
120
0
        };
121
0
        let r = Receiver {
122
0
            flavor: ReceiverFlavor::Array(r),
123
0
        };
124
0
        (s, r)
125
    }
126
0
}
127
128
/// Creates a receiver that delivers a message after a certain duration of time.
129
///
130
/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131
/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132
/// sent.
133
///
134
/// # Examples
135
///
136
/// Using an `after` channel for timeouts:
137
///
138
/// ```
139
/// use std::time::Duration;
140
/// use crossbeam_channel::{after, select, unbounded};
141
///
142
/// let (s, r) = unbounded::<i32>();
143
/// let timeout = Duration::from_millis(100);
144
///
145
/// select! {
146
///     recv(r) -> msg => println!("received {:?}", msg),
147
///     recv(after(timeout)) -> _ => println!("timed out"),
148
/// }
149
/// ```
150
///
151
/// When the message gets sent:
152
///
153
/// ```
154
/// use std::thread;
155
/// use std::time::{Duration, Instant};
156
/// use crossbeam_channel::after;
157
///
158
/// // Converts a number of milliseconds into a `Duration`.
159
/// let ms = |ms| Duration::from_millis(ms);
160
///
161
/// // Returns `true` if `a` and `b` are very close `Instant`s.
162
/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
163
///
164
/// let start = Instant::now();
165
/// let r = after(ms(100));
166
///
167
/// thread::sleep(ms(500));
168
///
169
/// // This message was sent 100 ms from the start and received 500 ms from the start.
170
/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171
/// assert!(eq(Instant::now(), start + ms(500)));
172
/// ```
173
0
pub fn after(duration: Duration) -> Receiver<Instant> {
174
0
    match Instant::now().checked_add(duration) {
175
0
        Some(deadline) => Receiver {
176
0
            flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
177
0
        },
178
0
        None => never(),
179
    }
180
0
}
181
182
/// Creates a receiver that delivers a message at a certain instant in time.
183
///
184
/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
185
/// be sent into the channel at the moment in time `when`. The message is the instant at which it
186
/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
187
/// instantly to the receiver.
188
///
189
/// # Examples
190
///
191
/// Using an `at` channel for timeouts:
192
///
193
/// ```
194
/// use std::time::{Instant, Duration};
195
/// use crossbeam_channel::{at, select, unbounded};
196
///
197
/// let (s, r) = unbounded::<i32>();
198
/// let deadline = Instant::now() + Duration::from_millis(500);
199
///
200
/// select! {
201
///     recv(r) -> msg => println!("received {:?}", msg),
202
///     recv(at(deadline)) -> _ => println!("timed out"),
203
/// }
204
/// ```
205
///
206
/// When the message gets sent:
207
///
208
/// ```
209
/// use std::time::{Duration, Instant};
210
/// use crossbeam_channel::at;
211
///
212
/// // Converts a number of milliseconds into a `Duration`.
213
/// let ms = |ms| Duration::from_millis(ms);
214
///
215
/// let start = Instant::now();
216
/// let end = start + ms(100);
217
///
218
/// let r = at(end);
219
///
220
/// // This message was sent 100 ms from the start
221
/// assert_eq!(r.recv().unwrap(), end);
222
/// assert!(Instant::now() > start + ms(100));
223
/// ```
224
0
pub fn at(when: Instant) -> Receiver<Instant> {
225
0
    Receiver {
226
0
        flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
227
0
    }
228
0
}
229
230
/// Creates a receiver that never delivers messages.
231
///
232
/// The channel is bounded with capacity of 0 and never gets disconnected.
233
///
234
/// # Examples
235
///
236
/// Using a `never` channel to optionally add a timeout to [`select!`]:
237
///
238
/// [`select!`]: crate::select!
239
///
240
/// ```
241
/// use std::thread;
242
/// use std::time::Duration;
243
/// use crossbeam_channel::{after, select, never, unbounded};
244
///
245
/// let (s, r) = unbounded();
246
///
247
/// thread::spawn(move || {
248
///     thread::sleep(Duration::from_secs(1));
249
///     s.send(1).unwrap();
250
/// });
251
///
252
/// // Suppose this duration can be a `Some` or a `None`.
253
/// let duration = Some(Duration::from_millis(100));
254
///
255
/// // Create a channel that times out after the specified duration.
256
/// let timeout = duration
257
///     .map(|d| after(d))
258
///     .unwrap_or(never());
259
///
260
/// select! {
261
///     recv(r) -> msg => assert_eq!(msg, Ok(1)),
262
///     recv(timeout) -> _ => println!("timed out"),
263
/// }
264
/// ```
265
0
pub fn never<T>() -> Receiver<T> {
266
0
    Receiver {
267
0
        flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
268
0
    }
269
0
}
270
271
/// Creates a receiver that delivers messages periodically.
272
///
273
/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
274
/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
275
/// sent.
276
///
277
/// # Examples
278
///
279
/// Using a `tick` channel to periodically print elapsed time:
280
///
281
/// ```
282
/// use std::time::{Duration, Instant};
283
/// use crossbeam_channel::tick;
284
///
285
/// let start = Instant::now();
286
/// let ticker = tick(Duration::from_millis(100));
287
///
288
/// for _ in 0..5 {
289
///     ticker.recv().unwrap();
290
///     println!("elapsed: {:?}", start.elapsed());
291
/// }
292
/// ```
293
///
294
/// When messages get sent:
295
///
296
/// ```
297
/// use std::thread;
298
/// use std::time::{Duration, Instant};
299
/// use crossbeam_channel::tick;
300
///
301
/// // Converts a number of milliseconds into a `Duration`.
302
/// let ms = |ms| Duration::from_millis(ms);
303
///
304
/// // Returns `true` if `a` and `b` are very close `Instant`s.
305
/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
306
///
307
/// let start = Instant::now();
308
/// let r = tick(ms(100));
309
///
310
/// // This message was sent 100 ms from the start and received 100 ms from the start.
311
/// assert!(eq(r.recv().unwrap(), start + ms(100)));
312
/// assert!(eq(Instant::now(), start + ms(100)));
313
///
314
/// thread::sleep(ms(500));
315
///
316
/// // This message was sent 200 ms from the start and received 600 ms from the start.
317
/// assert!(eq(r.recv().unwrap(), start + ms(200)));
318
/// assert!(eq(Instant::now(), start + ms(600)));
319
///
320
/// // This message was sent 700 ms from the start and received 700 ms from the start.
321
/// assert!(eq(r.recv().unwrap(), start + ms(700)));
322
/// assert!(eq(Instant::now(), start + ms(700)));
323
/// ```
324
0
pub fn tick(duration: Duration) -> Receiver<Instant> {
325
0
    match Instant::now().checked_add(duration) {
326
0
        Some(delivery_time) => Receiver {
327
0
            flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
328
0
                delivery_time,
329
0
                duration,
330
0
            ))),
331
0
        },
332
0
        None => never(),
333
    }
334
0
}
335
336
/// The sending side of a channel.
337
///
338
/// # Examples
339
///
340
/// ```
341
/// use std::thread;
342
/// use crossbeam_channel::unbounded;
343
///
344
/// let (s1, r) = unbounded();
345
/// let s2 = s1.clone();
346
///
347
/// thread::spawn(move || s1.send(1).unwrap());
348
/// thread::spawn(move || s2.send(2).unwrap());
349
///
350
/// let msg1 = r.recv().unwrap();
351
/// let msg2 = r.recv().unwrap();
352
///
353
/// assert_eq!(msg1 + msg2, 3);
354
/// ```
355
pub struct Sender<T> {
356
    flavor: SenderFlavor<T>,
357
}
358
359
/// Sender flavors.
360
enum SenderFlavor<T> {
361
    /// Bounded channel based on a preallocated array.
362
    Array(counter::Sender<flavors::array::Channel<T>>),
363
364
    /// Unbounded channel implemented as a linked list.
365
    List(counter::Sender<flavors::list::Channel<T>>),
366
367
    /// Zero-capacity channel.
368
    Zero(counter::Sender<flavors::zero::Channel<T>>),
369
}
370
371
unsafe impl<T: Send> Send for Sender<T> {}
372
unsafe impl<T: Send> Sync for Sender<T> {}
373
374
impl<T> UnwindSafe for Sender<T> {}
375
impl<T> RefUnwindSafe for Sender<T> {}
376
377
impl<T> Sender<T> {
378
    /// Attempts to send a message into the channel without blocking.
379
    ///
380
    /// This method will either send a message into the channel immediately or return an error if
381
    /// the channel is full or disconnected. The returned error contains the original message.
382
    ///
383
    /// If called on a zero-capacity channel, this method will send the message only if there
384
    /// happens to be a receive operation on the other side of the channel at the same time.
385
    ///
386
    /// # Examples
387
    ///
388
    /// ```
389
    /// use crossbeam_channel::{bounded, TrySendError};
390
    ///
391
    /// let (s, r) = bounded(1);
392
    ///
393
    /// assert_eq!(s.try_send(1), Ok(()));
394
    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
395
    ///
396
    /// drop(r);
397
    /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
398
    /// ```
399
0
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
400
0
        match &self.flavor {
401
0
            SenderFlavor::Array(chan) => chan.try_send(msg),
402
0
            SenderFlavor::List(chan) => chan.try_send(msg),
403
0
            SenderFlavor::Zero(chan) => chan.try_send(msg),
404
        }
405
0
    }
406
407
    /// Blocks the current thread until a message is sent or the channel is disconnected.
408
    ///
409
    /// If the channel is full and not disconnected, this call will block until the send operation
410
    /// can proceed. If the channel becomes disconnected, this call will wake up and return an
411
    /// error. The returned error contains the original message.
412
    ///
413
    /// If called on a zero-capacity channel, this method will wait for a receive operation to
414
    /// appear on the other side of the channel.
415
    ///
416
    /// # Examples
417
    ///
418
    /// ```
419
    /// use std::thread;
420
    /// use std::time::Duration;
421
    /// use crossbeam_channel::{bounded, SendError};
422
    ///
423
    /// let (s, r) = bounded(1);
424
    /// assert_eq!(s.send(1), Ok(()));
425
    ///
426
    /// thread::spawn(move || {
427
    ///     assert_eq!(r.recv(), Ok(1));
428
    ///     thread::sleep(Duration::from_secs(1));
429
    ///     drop(r);
430
    /// });
431
    ///
432
    /// assert_eq!(s.send(2), Ok(()));
433
    /// assert_eq!(s.send(3), Err(SendError(3)));
434
    /// ```
435
0
    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
436
0
        match &self.flavor {
437
0
            SenderFlavor::Array(chan) => chan.send(msg, None),
438
0
            SenderFlavor::List(chan) => chan.send(msg, None),
439
0
            SenderFlavor::Zero(chan) => chan.send(msg, None),
440
        }
441
0
        .map_err(|err| match err {
442
0
            SendTimeoutError::Disconnected(msg) => SendError(msg),
443
0
            SendTimeoutError::Timeout(_) => unreachable!(),
444
0
        })
445
0
    }
446
447
    /// Waits for a message to be sent into the channel, but only for a limited time.
448
    ///
449
    /// If the channel is full and not disconnected, this call will block until the send operation
450
    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
451
    /// wake up and return an error. The returned error contains the original message.
452
    ///
453
    /// If called on a zero-capacity channel, this method will wait for a receive operation to
454
    /// appear on the other side of the channel.
455
    ///
456
    /// # Examples
457
    ///
458
    /// ```
459
    /// use std::thread;
460
    /// use std::time::Duration;
461
    /// use crossbeam_channel::{bounded, SendTimeoutError};
462
    ///
463
    /// let (s, r) = bounded(0);
464
    ///
465
    /// thread::spawn(move || {
466
    ///     thread::sleep(Duration::from_secs(1));
467
    ///     assert_eq!(r.recv(), Ok(2));
468
    ///     drop(r);
469
    /// });
470
    ///
471
    /// assert_eq!(
472
    ///     s.send_timeout(1, Duration::from_millis(500)),
473
    ///     Err(SendTimeoutError::Timeout(1)),
474
    /// );
475
    /// assert_eq!(
476
    ///     s.send_timeout(2, Duration::from_secs(1)),
477
    ///     Ok(()),
478
    /// );
479
    /// assert_eq!(
480
    ///     s.send_timeout(3, Duration::from_millis(500)),
481
    ///     Err(SendTimeoutError::Disconnected(3)),
482
    /// );
483
    /// ```
484
0
    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
485
0
        match Instant::now().checked_add(timeout) {
486
0
            Some(deadline) => self.send_deadline(msg, deadline),
487
0
            None => self.send(msg).map_err(SendTimeoutError::from),
488
        }
489
0
    }
490
491
    /// Waits for a message to be sent into the channel, but only until a given deadline.
492
    ///
493
    /// If the channel is full and not disconnected, this call will block until the send operation
494
    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
495
    /// wake up and return an error. The returned error contains the original message.
496
    ///
497
    /// If called on a zero-capacity channel, this method will wait for a receive operation to
498
    /// appear on the other side of the channel.
499
    ///
500
    /// # Examples
501
    ///
502
    /// ```
503
    /// use std::thread;
504
    /// use std::time::{Duration, Instant};
505
    /// use crossbeam_channel::{bounded, SendTimeoutError};
506
    ///
507
    /// let (s, r) = bounded(0);
508
    ///
509
    /// thread::spawn(move || {
510
    ///     thread::sleep(Duration::from_secs(1));
511
    ///     assert_eq!(r.recv(), Ok(2));
512
    ///     drop(r);
513
    /// });
514
    ///
515
    /// let now = Instant::now();
516
    ///
517
    /// assert_eq!(
518
    ///     s.send_deadline(1, now + Duration::from_millis(500)),
519
    ///     Err(SendTimeoutError::Timeout(1)),
520
    /// );
521
    /// assert_eq!(
522
    ///     s.send_deadline(2, now + Duration::from_millis(1500)),
523
    ///     Ok(()),
524
    /// );
525
    /// assert_eq!(
526
    ///     s.send_deadline(3, now + Duration::from_millis(2000)),
527
    ///     Err(SendTimeoutError::Disconnected(3)),
528
    /// );
529
    /// ```
530
0
    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
531
0
        match &self.flavor {
532
0
            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
533
0
            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
534
0
            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
535
        }
536
0
    }
537
538
    /// Returns `true` if the channel is empty.
539
    ///
540
    /// Note: Zero-capacity channels are always empty.
541
    ///
542
    /// # Examples
543
    ///
544
    /// ```
545
    /// use crossbeam_channel::unbounded;
546
    ///
547
    /// let (s, r) = unbounded();
548
    /// assert!(s.is_empty());
549
    ///
550
    /// s.send(0).unwrap();
551
    /// assert!(!s.is_empty());
552
    /// ```
553
0
    pub fn is_empty(&self) -> bool {
554
0
        match &self.flavor {
555
0
            SenderFlavor::Array(chan) => chan.is_empty(),
556
0
            SenderFlavor::List(chan) => chan.is_empty(),
557
0
            SenderFlavor::Zero(chan) => chan.is_empty(),
558
        }
559
0
    }
560
561
    /// Returns `true` if the channel is full.
562
    ///
563
    /// Note: Zero-capacity channels are always full.
564
    ///
565
    /// # Examples
566
    ///
567
    /// ```
568
    /// use crossbeam_channel::bounded;
569
    ///
570
    /// let (s, r) = bounded(1);
571
    ///
572
    /// assert!(!s.is_full());
573
    /// s.send(0).unwrap();
574
    /// assert!(s.is_full());
575
    /// ```
576
0
    pub fn is_full(&self) -> bool {
577
0
        match &self.flavor {
578
0
            SenderFlavor::Array(chan) => chan.is_full(),
579
0
            SenderFlavor::List(chan) => chan.is_full(),
580
0
            SenderFlavor::Zero(chan) => chan.is_full(),
581
        }
582
0
    }
583
584
    /// Returns the number of messages in the channel.
585
    ///
586
    /// # Examples
587
    ///
588
    /// ```
589
    /// use crossbeam_channel::unbounded;
590
    ///
591
    /// let (s, r) = unbounded();
592
    /// assert_eq!(s.len(), 0);
593
    ///
594
    /// s.send(1).unwrap();
595
    /// s.send(2).unwrap();
596
    /// assert_eq!(s.len(), 2);
597
    /// ```
598
0
    pub fn len(&self) -> usize {
599
0
        match &self.flavor {
600
0
            SenderFlavor::Array(chan) => chan.len(),
601
0
            SenderFlavor::List(chan) => chan.len(),
602
0
            SenderFlavor::Zero(chan) => chan.len(),
603
        }
604
0
    }
605
606
    /// If the channel is bounded, returns its capacity.
607
    ///
608
    /// # Examples
609
    ///
610
    /// ```
611
    /// use crossbeam_channel::{bounded, unbounded};
612
    ///
613
    /// let (s, _) = unbounded::<i32>();
614
    /// assert_eq!(s.capacity(), None);
615
    ///
616
    /// let (s, _) = bounded::<i32>(5);
617
    /// assert_eq!(s.capacity(), Some(5));
618
    ///
619
    /// let (s, _) = bounded::<i32>(0);
620
    /// assert_eq!(s.capacity(), Some(0));
621
    /// ```
622
0
    pub fn capacity(&self) -> Option<usize> {
623
0
        match &self.flavor {
624
0
            SenderFlavor::Array(chan) => chan.capacity(),
625
0
            SenderFlavor::List(chan) => chan.capacity(),
626
0
            SenderFlavor::Zero(chan) => chan.capacity(),
627
        }
628
0
    }
629
630
    /// Returns `true` if senders belong to the same channel.
631
    ///
632
    /// # Examples
633
    ///
634
    /// ```rust
635
    /// use crossbeam_channel::unbounded;
636
    ///
637
    /// let (s, _) = unbounded::<usize>();
638
    ///
639
    /// let s2 = s.clone();
640
    /// assert!(s.same_channel(&s2));
641
    ///
642
    /// let (s3, _) = unbounded();
643
    /// assert!(!s.same_channel(&s3));
644
    /// ```
645
0
    pub fn same_channel(&self, other: &Sender<T>) -> bool {
646
0
        match (&self.flavor, &other.flavor) {
647
0
            (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
648
0
            (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
649
0
            (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
650
0
            _ => false,
651
        }
652
0
    }
653
}
654
655
impl<T> Drop for Sender<T> {
656
0
    fn drop(&mut self) {
657
        unsafe {
658
0
            match &self.flavor {
659
0
                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
660
0
                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
661
0
                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
662
            }
663
        }
664
0
    }
665
}
666
667
impl<T> Clone for Sender<T> {
668
0
    fn clone(&self) -> Self {
669
0
        let flavor = match &self.flavor {
670
0
            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
671
0
            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
672
0
            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
673
        };
674
675
0
        Sender { flavor }
676
0
    }
677
}
678
679
impl<T> fmt::Debug for Sender<T> {
680
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681
0
        f.pad("Sender { .. }")
682
0
    }
683
}
684
685
/// The receiving side of a channel.
686
///
687
/// # Examples
688
///
689
/// ```
690
/// use std::thread;
691
/// use std::time::Duration;
692
/// use crossbeam_channel::unbounded;
693
///
694
/// let (s, r) = unbounded();
695
///
696
/// thread::spawn(move || {
697
///     let _ = s.send(1);
698
///     thread::sleep(Duration::from_secs(1));
699
///     let _ = s.send(2);
700
/// });
701
///
702
/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
703
/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
704
/// ```
705
pub struct Receiver<T> {
706
    flavor: ReceiverFlavor<T>,
707
}
708
709
/// Receiver flavors.
710
enum ReceiverFlavor<T> {
711
    /// Bounded channel based on a preallocated array.
712
    Array(counter::Receiver<flavors::array::Channel<T>>),
713
714
    /// Unbounded channel implemented as a linked list.
715
    List(counter::Receiver<flavors::list::Channel<T>>),
716
717
    /// Zero-capacity channel.
718
    Zero(counter::Receiver<flavors::zero::Channel<T>>),
719
720
    /// The after flavor.
721
    At(Arc<flavors::at::Channel>),
722
723
    /// The tick flavor.
724
    Tick(Arc<flavors::tick::Channel>),
725
726
    /// The never flavor.
727
    Never(flavors::never::Channel<T>),
728
}
729
730
unsafe impl<T: Send> Send for Receiver<T> {}
731
unsafe impl<T: Send> Sync for Receiver<T> {}
732
733
impl<T> UnwindSafe for Receiver<T> {}
734
impl<T> RefUnwindSafe for Receiver<T> {}
735
736
impl<T> Receiver<T> {
737
    /// Attempts to receive a message from the channel without blocking.
738
    ///
739
    /// This method will either receive a message from the channel immediately or return an error
740
    /// if the channel is empty.
741
    ///
742
    /// If called on a zero-capacity channel, this method will receive a message only if there
743
    /// happens to be a send operation on the other side of the channel at the same time.
744
    ///
745
    /// # Examples
746
    ///
747
    /// ```
748
    /// use crossbeam_channel::{unbounded, TryRecvError};
749
    ///
750
    /// let (s, r) = unbounded();
751
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
752
    ///
753
    /// s.send(5).unwrap();
754
    /// drop(s);
755
    ///
756
    /// assert_eq!(r.try_recv(), Ok(5));
757
    /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
758
    /// ```
759
0
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
760
0
        match &self.flavor {
761
0
            ReceiverFlavor::Array(chan) => chan.try_recv(),
762
0
            ReceiverFlavor::List(chan) => chan.try_recv(),
763
0
            ReceiverFlavor::Zero(chan) => chan.try_recv(),
764
0
            ReceiverFlavor::At(chan) => {
765
0
                let msg = chan.try_recv();
766
                unsafe {
767
0
                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
768
0
                        &msg,
769
                    )
770
                }
771
            }
772
0
            ReceiverFlavor::Tick(chan) => {
773
0
                let msg = chan.try_recv();
774
                unsafe {
775
0
                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
776
0
                        &msg,
777
                    )
778
                }
779
            }
780
0
            ReceiverFlavor::Never(chan) => chan.try_recv(),
781
        }
782
0
    }
783
784
    /// Blocks the current thread until a message is received or the channel is empty and
785
    /// disconnected.
786
    ///
787
    /// If the channel is empty and not disconnected, this call will block until the receive
788
    /// operation can proceed. If the channel is empty and becomes disconnected, this call will
789
    /// wake up and return an error.
790
    ///
791
    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
792
    /// on the other side of the channel.
793
    ///
794
    /// # Examples
795
    ///
796
    /// ```
797
    /// use std::thread;
798
    /// use std::time::Duration;
799
    /// use crossbeam_channel::{unbounded, RecvError};
800
    ///
801
    /// let (s, r) = unbounded();
802
    ///
803
    /// thread::spawn(move || {
804
    ///     thread::sleep(Duration::from_secs(1));
805
    ///     s.send(5).unwrap();
806
    ///     drop(s);
807
    /// });
808
    ///
809
    /// assert_eq!(r.recv(), Ok(5));
810
    /// assert_eq!(r.recv(), Err(RecvError));
811
    /// ```
812
0
    pub fn recv(&self) -> Result<T, RecvError> {
813
0
        match &self.flavor {
814
0
            ReceiverFlavor::Array(chan) => chan.recv(None),
815
0
            ReceiverFlavor::List(chan) => chan.recv(None),
816
0
            ReceiverFlavor::Zero(chan) => chan.recv(None),
817
0
            ReceiverFlavor::At(chan) => {
818
0
                let msg = chan.recv(None);
819
                unsafe {
820
0
                    mem::transmute_copy::<
821
0
                        Result<Instant, RecvTimeoutError>,
822
0
                        Result<T, RecvTimeoutError>,
823
0
                    >(&msg)
824
                }
825
            }
826
0
            ReceiverFlavor::Tick(chan) => {
827
0
                let msg = chan.recv(None);
828
                unsafe {
829
0
                    mem::transmute_copy::<
830
0
                        Result<Instant, RecvTimeoutError>,
831
0
                        Result<T, RecvTimeoutError>,
832
0
                    >(&msg)
833
                }
834
            }
835
0
            ReceiverFlavor::Never(chan) => chan.recv(None),
836
        }
837
0
        .map_err(|_| RecvError)
838
0
    }
839
840
    /// Waits for a message to be received from the channel, but only for a limited time.
841
    ///
842
    /// If the channel is empty and not disconnected, this call will block until the receive
843
    /// operation can proceed or the operation times out. If the channel is empty and becomes
844
    /// disconnected, this call will wake up and return an error.
845
    ///
846
    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
847
    /// on the other side of the channel.
848
    ///
849
    /// # Examples
850
    ///
851
    /// ```
852
    /// use std::thread;
853
    /// use std::time::Duration;
854
    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
855
    ///
856
    /// let (s, r) = unbounded();
857
    ///
858
    /// thread::spawn(move || {
859
    ///     thread::sleep(Duration::from_secs(1));
860
    ///     s.send(5).unwrap();
861
    ///     drop(s);
862
    /// });
863
    ///
864
    /// assert_eq!(
865
    ///     r.recv_timeout(Duration::from_millis(500)),
866
    ///     Err(RecvTimeoutError::Timeout),
867
    /// );
868
    /// assert_eq!(
869
    ///     r.recv_timeout(Duration::from_secs(1)),
870
    ///     Ok(5),
871
    /// );
872
    /// assert_eq!(
873
    ///     r.recv_timeout(Duration::from_secs(1)),
874
    ///     Err(RecvTimeoutError::Disconnected),
875
    /// );
876
    /// ```
877
0
    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
878
0
        match Instant::now().checked_add(timeout) {
879
0
            Some(deadline) => self.recv_deadline(deadline),
880
0
            None => self.recv().map_err(RecvTimeoutError::from),
881
        }
882
0
    }
883
884
    /// Waits for a message to be received from the channel, but only before a given deadline.
885
    ///
886
    /// If the channel is empty and not disconnected, this call will block until the receive
887
    /// operation can proceed or the operation times out. If the channel is empty and becomes
888
    /// disconnected, this call will wake up and return an error.
889
    ///
890
    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
891
    /// on the other side of the channel.
892
    ///
893
    /// # Examples
894
    ///
895
    /// ```
896
    /// use std::thread;
897
    /// use std::time::{Instant, Duration};
898
    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
899
    ///
900
    /// let (s, r) = unbounded();
901
    ///
902
    /// thread::spawn(move || {
903
    ///     thread::sleep(Duration::from_secs(1));
904
    ///     s.send(5).unwrap();
905
    ///     drop(s);
906
    /// });
907
    ///
908
    /// let now = Instant::now();
909
    ///
910
    /// assert_eq!(
911
    ///     r.recv_deadline(now + Duration::from_millis(500)),
912
    ///     Err(RecvTimeoutError::Timeout),
913
    /// );
914
    /// assert_eq!(
915
    ///     r.recv_deadline(now + Duration::from_millis(1500)),
916
    ///     Ok(5),
917
    /// );
918
    /// assert_eq!(
919
    ///     r.recv_deadline(now + Duration::from_secs(5)),
920
    ///     Err(RecvTimeoutError::Disconnected),
921
    /// );
922
    /// ```
923
0
    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
924
0
        match &self.flavor {
925
0
            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
926
0
            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
927
0
            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
928
0
            ReceiverFlavor::At(chan) => {
929
0
                let msg = chan.recv(Some(deadline));
930
                unsafe {
931
0
                    mem::transmute_copy::<
932
0
                        Result<Instant, RecvTimeoutError>,
933
0
                        Result<T, RecvTimeoutError>,
934
0
                    >(&msg)
935
                }
936
            }
937
0
            ReceiverFlavor::Tick(chan) => {
938
0
                let msg = chan.recv(Some(deadline));
939
                unsafe {
940
0
                    mem::transmute_copy::<
941
0
                        Result<Instant, RecvTimeoutError>,
942
0
                        Result<T, RecvTimeoutError>,
943
0
                    >(&msg)
944
                }
945
            }
946
0
            ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
947
        }
948
0
    }
949
950
    /// Returns `true` if the channel is empty.
951
    ///
952
    /// Note: Zero-capacity channels are always empty.
953
    ///
954
    /// # Examples
955
    ///
956
    /// ```
957
    /// use crossbeam_channel::unbounded;
958
    ///
959
    /// let (s, r) = unbounded();
960
    ///
961
    /// assert!(r.is_empty());
962
    /// s.send(0).unwrap();
963
    /// assert!(!r.is_empty());
964
    /// ```
965
0
    pub fn is_empty(&self) -> bool {
966
0
        match &self.flavor {
967
0
            ReceiverFlavor::Array(chan) => chan.is_empty(),
968
0
            ReceiverFlavor::List(chan) => chan.is_empty(),
969
0
            ReceiverFlavor::Zero(chan) => chan.is_empty(),
970
0
            ReceiverFlavor::At(chan) => chan.is_empty(),
971
0
            ReceiverFlavor::Tick(chan) => chan.is_empty(),
972
0
            ReceiverFlavor::Never(chan) => chan.is_empty(),
973
        }
974
0
    }
975
976
    /// Returns `true` if the channel is full.
977
    ///
978
    /// Note: Zero-capacity channels are always full.
979
    ///
980
    /// # Examples
981
    ///
982
    /// ```
983
    /// use crossbeam_channel::bounded;
984
    ///
985
    /// let (s, r) = bounded(1);
986
    ///
987
    /// assert!(!r.is_full());
988
    /// s.send(0).unwrap();
989
    /// assert!(r.is_full());
990
    /// ```
991
0
    pub fn is_full(&self) -> bool {
992
0
        match &self.flavor {
993
0
            ReceiverFlavor::Array(chan) => chan.is_full(),
994
0
            ReceiverFlavor::List(chan) => chan.is_full(),
995
0
            ReceiverFlavor::Zero(chan) => chan.is_full(),
996
0
            ReceiverFlavor::At(chan) => chan.is_full(),
997
0
            ReceiverFlavor::Tick(chan) => chan.is_full(),
998
0
            ReceiverFlavor::Never(chan) => chan.is_full(),
999
        }
1000
0
    }
1001
1002
    /// Returns the number of messages in the channel.
1003
    ///
1004
    /// # Examples
1005
    ///
1006
    /// ```
1007
    /// use crossbeam_channel::unbounded;
1008
    ///
1009
    /// let (s, r) = unbounded();
1010
    /// assert_eq!(r.len(), 0);
1011
    ///
1012
    /// s.send(1).unwrap();
1013
    /// s.send(2).unwrap();
1014
    /// assert_eq!(r.len(), 2);
1015
    /// ```
1016
0
    pub fn len(&self) -> usize {
1017
0
        match &self.flavor {
1018
0
            ReceiverFlavor::Array(chan) => chan.len(),
1019
0
            ReceiverFlavor::List(chan) => chan.len(),
1020
0
            ReceiverFlavor::Zero(chan) => chan.len(),
1021
0
            ReceiverFlavor::At(chan) => chan.len(),
1022
0
            ReceiverFlavor::Tick(chan) => chan.len(),
1023
0
            ReceiverFlavor::Never(chan) => chan.len(),
1024
        }
1025
0
    }
1026
1027
    /// If the channel is bounded, returns its capacity.
1028
    ///
1029
    /// # Examples
1030
    ///
1031
    /// ```
1032
    /// use crossbeam_channel::{bounded, unbounded};
1033
    ///
1034
    /// let (_, r) = unbounded::<i32>();
1035
    /// assert_eq!(r.capacity(), None);
1036
    ///
1037
    /// let (_, r) = bounded::<i32>(5);
1038
    /// assert_eq!(r.capacity(), Some(5));
1039
    ///
1040
    /// let (_, r) = bounded::<i32>(0);
1041
    /// assert_eq!(r.capacity(), Some(0));
1042
    /// ```
1043
0
    pub fn capacity(&self) -> Option<usize> {
1044
0
        match &self.flavor {
1045
0
            ReceiverFlavor::Array(chan) => chan.capacity(),
1046
0
            ReceiverFlavor::List(chan) => chan.capacity(),
1047
0
            ReceiverFlavor::Zero(chan) => chan.capacity(),
1048
0
            ReceiverFlavor::At(chan) => chan.capacity(),
1049
0
            ReceiverFlavor::Tick(chan) => chan.capacity(),
1050
0
            ReceiverFlavor::Never(chan) => chan.capacity(),
1051
        }
1052
0
    }
1053
1054
    /// A blocking iterator over messages in the channel.
1055
    ///
1056
    /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1057
    /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1058
    ///
1059
    /// [`next`]: Iterator::next
1060
    ///
1061
    /// # Examples
1062
    ///
1063
    /// ```
1064
    /// use std::thread;
1065
    /// use crossbeam_channel::unbounded;
1066
    ///
1067
    /// let (s, r) = unbounded();
1068
    ///
1069
    /// thread::spawn(move || {
1070
    ///     s.send(1).unwrap();
1071
    ///     s.send(2).unwrap();
1072
    ///     s.send(3).unwrap();
1073
    ///     drop(s); // Disconnect the channel.
1074
    /// });
1075
    ///
1076
    /// // Collect all messages from the channel.
1077
    /// // Note that the call to `collect` blocks until the sender is dropped.
1078
    /// let v: Vec<_> = r.iter().collect();
1079
    ///
1080
    /// assert_eq!(v, [1, 2, 3]);
1081
    /// ```
1082
0
    pub fn iter(&self) -> Iter<'_, T> {
1083
0
        Iter { receiver: self }
1084
0
    }
1085
1086
    /// A non-blocking iterator over messages in the channel.
1087
    ///
1088
    /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1089
    /// never blocks waiting for the next message.
1090
    ///
1091
    /// [`next`]: Iterator::next
1092
    ///
1093
    /// # Examples
1094
    ///
1095
    /// ```
1096
    /// use std::thread;
1097
    /// use std::time::Duration;
1098
    /// use crossbeam_channel::unbounded;
1099
    ///
1100
    /// let (s, r) = unbounded::<i32>();
1101
    ///
1102
    /// thread::spawn(move || {
1103
    ///     s.send(1).unwrap();
1104
    ///     thread::sleep(Duration::from_secs(1));
1105
    ///     s.send(2).unwrap();
1106
    ///     thread::sleep(Duration::from_secs(2));
1107
    ///     s.send(3).unwrap();
1108
    /// });
1109
    ///
1110
    /// thread::sleep(Duration::from_secs(2));
1111
    ///
1112
    /// // Collect all messages from the channel without blocking.
1113
    /// // The third message hasn't been sent yet so we'll collect only the first two.
1114
    /// let v: Vec<_> = r.try_iter().collect();
1115
    ///
1116
    /// assert_eq!(v, [1, 2]);
1117
    /// ```
1118
0
    pub fn try_iter(&self) -> TryIter<'_, T> {
1119
0
        TryIter { receiver: self }
1120
0
    }
1121
1122
    /// Returns `true` if receivers belong to the same channel.
1123
    ///
1124
    /// # Examples
1125
    ///
1126
    /// ```rust
1127
    /// use crossbeam_channel::unbounded;
1128
    ///
1129
    /// let (_, r) = unbounded::<usize>();
1130
    ///
1131
    /// let r2 = r.clone();
1132
    /// assert!(r.same_channel(&r2));
1133
    ///
1134
    /// let (_, r3) = unbounded();
1135
    /// assert!(!r.same_channel(&r3));
1136
    /// ```
1137
0
    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1138
0
        match (&self.flavor, &other.flavor) {
1139
0
            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1140
0
            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1141
0
            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1142
0
            (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1143
0
            (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1144
0
            (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1145
0
            _ => false,
1146
        }
1147
0
    }
1148
}
1149
1150
impl<T> Drop for Receiver<T> {
1151
0
    fn drop(&mut self) {
1152
        unsafe {
1153
0
            match &self.flavor {
1154
0
                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1155
0
                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1156
0
                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1157
0
                ReceiverFlavor::At(_) => {}
1158
0
                ReceiverFlavor::Tick(_) => {}
1159
0
                ReceiverFlavor::Never(_) => {}
1160
            }
1161
        }
1162
0
    }
1163
}
1164
1165
impl<T> Clone for Receiver<T> {
1166
0
    fn clone(&self) -> Self {
1167
0
        let flavor = match &self.flavor {
1168
0
            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1169
0
            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1170
0
            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1171
0
            ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1172
0
            ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1173
0
            ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1174
        };
1175
1176
0
        Receiver { flavor }
1177
0
    }
1178
}
1179
1180
impl<T> fmt::Debug for Receiver<T> {
1181
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182
0
        f.pad("Receiver { .. }")
1183
0
    }
1184
}
1185
1186
impl<'a, T> IntoIterator for &'a Receiver<T> {
1187
    type Item = T;
1188
    type IntoIter = Iter<'a, T>;
1189
1190
0
    fn into_iter(self) -> Self::IntoIter {
1191
0
        self.iter()
1192
0
    }
1193
}
1194
1195
impl<T> IntoIterator for Receiver<T> {
1196
    type Item = T;
1197
    type IntoIter = IntoIter<T>;
1198
1199
0
    fn into_iter(self) -> Self::IntoIter {
1200
0
        IntoIter { receiver: self }
1201
0
    }
1202
}
1203
1204
/// A blocking iterator over messages in a channel.
1205
///
1206
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1207
/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1208
///
1209
/// [`next`]: Iterator::next
1210
///
1211
/// # Examples
1212
///
1213
/// ```
1214
/// use std::thread;
1215
/// use crossbeam_channel::unbounded;
1216
///
1217
/// let (s, r) = unbounded();
1218
///
1219
/// thread::spawn(move || {
1220
///     s.send(1).unwrap();
1221
///     s.send(2).unwrap();
1222
///     s.send(3).unwrap();
1223
///     drop(s); // Disconnect the channel.
1224
/// });
1225
///
1226
/// // Collect all messages from the channel.
1227
/// // Note that the call to `collect` blocks until the sender is dropped.
1228
/// let v: Vec<_> = r.iter().collect();
1229
///
1230
/// assert_eq!(v, [1, 2, 3]);
1231
/// ```
1232
pub struct Iter<'a, T> {
1233
    receiver: &'a Receiver<T>,
1234
}
1235
1236
impl<T> FusedIterator for Iter<'_, T> {}
1237
1238
impl<T> Iterator for Iter<'_, T> {
1239
    type Item = T;
1240
1241
0
    fn next(&mut self) -> Option<Self::Item> {
1242
0
        self.receiver.recv().ok()
1243
0
    }
1244
}
1245
1246
impl<T> fmt::Debug for Iter<'_, T> {
1247
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248
0
        f.pad("Iter { .. }")
1249
0
    }
1250
}
1251
1252
/// A non-blocking iterator over messages in a channel.
1253
///
1254
/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1255
/// never blocks waiting for the next message.
1256
///
1257
/// [`next`]: Iterator::next
1258
///
1259
/// # Examples
1260
///
1261
/// ```
1262
/// use std::thread;
1263
/// use std::time::Duration;
1264
/// use crossbeam_channel::unbounded;
1265
///
1266
/// let (s, r) = unbounded::<i32>();
1267
///
1268
/// thread::spawn(move || {
1269
///     s.send(1).unwrap();
1270
///     thread::sleep(Duration::from_secs(1));
1271
///     s.send(2).unwrap();
1272
///     thread::sleep(Duration::from_secs(2));
1273
///     s.send(3).unwrap();
1274
/// });
1275
///
1276
/// thread::sleep(Duration::from_secs(2));
1277
///
1278
/// // Collect all messages from the channel without blocking.
1279
/// // The third message hasn't been sent yet so we'll collect only the first two.
1280
/// let v: Vec<_> = r.try_iter().collect();
1281
///
1282
/// assert_eq!(v, [1, 2]);
1283
/// ```
1284
pub struct TryIter<'a, T> {
1285
    receiver: &'a Receiver<T>,
1286
}
1287
1288
impl<T> Iterator for TryIter<'_, T> {
1289
    type Item = T;
1290
1291
0
    fn next(&mut self) -> Option<Self::Item> {
1292
0
        self.receiver.try_recv().ok()
1293
0
    }
1294
}
1295
1296
impl<T> fmt::Debug for TryIter<'_, T> {
1297
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1298
0
        f.pad("TryIter { .. }")
1299
0
    }
1300
}
1301
1302
/// A blocking iterator over messages in a channel.
1303
///
1304
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1305
/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1306
///
1307
/// [`next`]: Iterator::next
1308
///
1309
/// # Examples
1310
///
1311
/// ```
1312
/// use std::thread;
1313
/// use crossbeam_channel::unbounded;
1314
///
1315
/// let (s, r) = unbounded();
1316
///
1317
/// thread::spawn(move || {
1318
///     s.send(1).unwrap();
1319
///     s.send(2).unwrap();
1320
///     s.send(3).unwrap();
1321
///     drop(s); // Disconnect the channel.
1322
/// });
1323
///
1324
/// // Collect all messages from the channel.
1325
/// // Note that the call to `collect` blocks until the sender is dropped.
1326
/// let v: Vec<_> = r.into_iter().collect();
1327
///
1328
/// assert_eq!(v, [1, 2, 3]);
1329
/// ```
1330
pub struct IntoIter<T> {
1331
    receiver: Receiver<T>,
1332
}
1333
1334
impl<T> FusedIterator for IntoIter<T> {}
1335
1336
impl<T> Iterator for IntoIter<T> {
1337
    type Item = T;
1338
1339
0
    fn next(&mut self) -> Option<Self::Item> {
1340
0
        self.receiver.recv().ok()
1341
0
    }
1342
}
1343
1344
impl<T> fmt::Debug for IntoIter<T> {
1345
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346
0
        f.pad("IntoIter { .. }")
1347
0
    }
1348
}
1349
1350
impl<T> SelectHandle for Sender<T> {
1351
0
    fn try_select(&self, token: &mut Token) -> bool {
1352
0
        match &self.flavor {
1353
0
            SenderFlavor::Array(chan) => chan.sender().try_select(token),
1354
0
            SenderFlavor::List(chan) => chan.sender().try_select(token),
1355
0
            SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1356
        }
1357
0
    }
1358
1359
0
    fn deadline(&self) -> Option<Instant> {
1360
0
        None
1361
0
    }
1362
1363
0
    fn register(&self, oper: Operation, cx: &Context) -> bool {
1364
0
        match &self.flavor {
1365
0
            SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1366
0
            SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1367
0
            SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1368
        }
1369
0
    }
1370
1371
0
    fn unregister(&self, oper: Operation) {
1372
0
        match &self.flavor {
1373
0
            SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1374
0
            SenderFlavor::List(chan) => chan.sender().unregister(oper),
1375
0
            SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1376
        }
1377
0
    }
1378
1379
0
    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1380
0
        match &self.flavor {
1381
0
            SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1382
0
            SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1383
0
            SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1384
        }
1385
0
    }
1386
1387
0
    fn is_ready(&self) -> bool {
1388
0
        match &self.flavor {
1389
0
            SenderFlavor::Array(chan) => chan.sender().is_ready(),
1390
0
            SenderFlavor::List(chan) => chan.sender().is_ready(),
1391
0
            SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1392
        }
1393
0
    }
1394
1395
0
    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1396
0
        match &self.flavor {
1397
0
            SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1398
0
            SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1399
0
            SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1400
        }
1401
0
    }
1402
1403
0
    fn unwatch(&self, oper: Operation) {
1404
0
        match &self.flavor {
1405
0
            SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1406
0
            SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1407
0
            SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1408
        }
1409
0
    }
1410
}
1411
1412
impl<T> SelectHandle for Receiver<T> {
1413
0
    fn try_select(&self, token: &mut Token) -> bool {
1414
0
        match &self.flavor {
1415
0
            ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1416
0
            ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1417
0
            ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1418
0
            ReceiverFlavor::At(chan) => chan.try_select(token),
1419
0
            ReceiverFlavor::Tick(chan) => chan.try_select(token),
1420
0
            ReceiverFlavor::Never(chan) => chan.try_select(token),
1421
        }
1422
0
    }
1423
1424
0
    fn deadline(&self) -> Option<Instant> {
1425
0
        match &self.flavor {
1426
0
            ReceiverFlavor::Array(_) => None,
1427
0
            ReceiverFlavor::List(_) => None,
1428
0
            ReceiverFlavor::Zero(_) => None,
1429
0
            ReceiverFlavor::At(chan) => chan.deadline(),
1430
0
            ReceiverFlavor::Tick(chan) => chan.deadline(),
1431
0
            ReceiverFlavor::Never(chan) => chan.deadline(),
1432
        }
1433
0
    }
1434
1435
0
    fn register(&self, oper: Operation, cx: &Context) -> bool {
1436
0
        match &self.flavor {
1437
0
            ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1438
0
            ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1439
0
            ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1440
0
            ReceiverFlavor::At(chan) => chan.register(oper, cx),
1441
0
            ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1442
0
            ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1443
        }
1444
0
    }
1445
1446
0
    fn unregister(&self, oper: Operation) {
1447
0
        match &self.flavor {
1448
0
            ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1449
0
            ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1450
0
            ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1451
0
            ReceiverFlavor::At(chan) => chan.unregister(oper),
1452
0
            ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1453
0
            ReceiverFlavor::Never(chan) => chan.unregister(oper),
1454
        }
1455
0
    }
1456
1457
0
    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1458
0
        match &self.flavor {
1459
0
            ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1460
0
            ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1461
0
            ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1462
0
            ReceiverFlavor::At(chan) => chan.accept(token, cx),
1463
0
            ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1464
0
            ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1465
        }
1466
0
    }
1467
1468
0
    fn is_ready(&self) -> bool {
1469
0
        match &self.flavor {
1470
0
            ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1471
0
            ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1472
0
            ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1473
0
            ReceiverFlavor::At(chan) => chan.is_ready(),
1474
0
            ReceiverFlavor::Tick(chan) => chan.is_ready(),
1475
0
            ReceiverFlavor::Never(chan) => chan.is_ready(),
1476
        }
1477
0
    }
1478
1479
0
    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1480
0
        match &self.flavor {
1481
0
            ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1482
0
            ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1483
0
            ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1484
0
            ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1485
0
            ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1486
0
            ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1487
        }
1488
0
    }
1489
1490
0
    fn unwatch(&self, oper: Operation) {
1491
0
        match &self.flavor {
1492
0
            ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1493
0
            ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1494
0
            ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1495
0
            ReceiverFlavor::At(chan) => chan.unwatch(oper),
1496
0
            ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1497
0
            ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1498
        }
1499
0
    }
1500
}
1501
1502
/// Writes a message into the channel.
1503
0
pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1504
0
    match &s.flavor {
1505
0
        SenderFlavor::Array(chan) => chan.write(token, msg),
1506
0
        SenderFlavor::List(chan) => chan.write(token, msg),
1507
0
        SenderFlavor::Zero(chan) => chan.write(token, msg),
1508
    }
1509
0
}
1510
1511
/// Reads a message from the channel.
1512
0
pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1513
0
    match &r.flavor {
1514
0
        ReceiverFlavor::Array(chan) => chan.read(token),
1515
0
        ReceiverFlavor::List(chan) => chan.read(token),
1516
0
        ReceiverFlavor::Zero(chan) => chan.read(token),
1517
0
        ReceiverFlavor::At(chan) => {
1518
0
            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1519
        }
1520
0
        ReceiverFlavor::Tick(chan) => {
1521
0
            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1522
        }
1523
0
        ReceiverFlavor::Never(chan) => chan.read(token),
1524
    }
1525
0
}