/rust/registry/src/index.crates.io-1949cf8c6b5b557f/crossbeam-channel-0.5.15/src/channel.rs
Line | Count | Source |
1 | | //! The channel interface. |
2 | | |
3 | | use std::fmt; |
4 | | use std::iter::FusedIterator; |
5 | | use std::mem; |
6 | | use std::panic::{RefUnwindSafe, UnwindSafe}; |
7 | | use std::sync::Arc; |
8 | | use std::time::{Duration, Instant}; |
9 | | |
10 | | use crate::context::Context; |
11 | | use crate::counter; |
12 | | use crate::err::{ |
13 | | RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, |
14 | | }; |
15 | | use crate::flavors; |
16 | | use crate::select::{Operation, SelectHandle, Token}; |
17 | | |
18 | | /// Creates a channel of unbounded capacity. |
19 | | /// |
20 | | /// This channel has a growable buffer that can hold any number of messages at a time. |
21 | | /// |
22 | | /// # Examples |
23 | | /// |
24 | | /// ``` |
25 | | /// use std::thread; |
26 | | /// use crossbeam_channel::unbounded; |
27 | | /// |
28 | | /// let (s, r) = unbounded(); |
29 | | /// |
30 | | /// // Computes the n-th Fibonacci number. |
31 | | /// fn fib(n: i32) -> i32 { |
32 | | /// if n <= 1 { |
33 | | /// n |
34 | | /// } else { |
35 | | /// fib(n - 1) + fib(n - 2) |
36 | | /// } |
37 | | /// } |
38 | | /// |
39 | | /// // Spawn an asynchronous computation. |
40 | | /// thread::spawn(move || s.send(fib(20)).unwrap()); |
41 | | /// |
42 | | /// // Print the result of the computation. |
43 | | /// println!("{}", r.recv().unwrap()); |
44 | | /// ``` |
45 | 0 | pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { |
46 | 0 | let (s, r) = counter::new(flavors::list::Channel::new()); |
47 | 0 | let s = Sender { |
48 | 0 | flavor: SenderFlavor::List(s), |
49 | 0 | }; |
50 | 0 | let r = Receiver { |
51 | 0 | flavor: ReceiverFlavor::List(r), |
52 | 0 | }; |
53 | 0 | (s, r) |
54 | 0 | } |
55 | | |
56 | | /// Creates a channel of bounded capacity. |
57 | | /// |
58 | | /// This channel has a buffer that can hold at most `cap` messages at a time. |
59 | | /// |
60 | | /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
61 | | /// receive operations must appear at the same time in order to pair up and pass the message over. |
62 | | /// |
63 | | /// # Examples |
64 | | /// |
65 | | /// A channel of capacity 1: |
66 | | /// |
67 | | /// ``` |
68 | | /// use std::thread; |
69 | | /// use std::time::Duration; |
70 | | /// use crossbeam_channel::bounded; |
71 | | /// |
72 | | /// let (s, r) = bounded(1); |
73 | | /// |
74 | | /// // This call returns immediately because there is enough space in the channel. |
75 | | /// s.send(1).unwrap(); |
76 | | /// |
77 | | /// thread::spawn(move || { |
78 | | /// // This call blocks the current thread because the channel is full. |
79 | | /// // It will be able to complete only after the first message is received. |
80 | | /// s.send(2).unwrap(); |
81 | | /// }); |
82 | | /// |
83 | | /// thread::sleep(Duration::from_secs(1)); |
84 | | /// assert_eq!(r.recv(), Ok(1)); |
85 | | /// assert_eq!(r.recv(), Ok(2)); |
86 | | /// ``` |
87 | | /// |
88 | | /// A zero-capacity channel: |
89 | | /// |
90 | | /// ``` |
91 | | /// use std::thread; |
92 | | /// use std::time::Duration; |
93 | | /// use crossbeam_channel::bounded; |
94 | | /// |
95 | | /// let (s, r) = bounded(0); |
96 | | /// |
97 | | /// thread::spawn(move || { |
98 | | /// // This call blocks the current thread until a receive operation appears |
99 | | /// // on the other side of the channel. |
100 | | /// s.send(1).unwrap(); |
101 | | /// }); |
102 | | /// |
103 | | /// thread::sleep(Duration::from_secs(1)); |
104 | | /// assert_eq!(r.recv(), Ok(1)); |
105 | | /// ``` |
106 | 0 | pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
107 | 0 | if cap == 0 { |
108 | 0 | let (s, r) = counter::new(flavors::zero::Channel::new()); |
109 | 0 | let s = Sender { |
110 | 0 | flavor: SenderFlavor::Zero(s), |
111 | 0 | }; |
112 | 0 | let r = Receiver { |
113 | 0 | flavor: ReceiverFlavor::Zero(r), |
114 | 0 | }; |
115 | 0 | (s, r) |
116 | | } else { |
117 | 0 | let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap)); |
118 | 0 | let s = Sender { |
119 | 0 | flavor: SenderFlavor::Array(s), |
120 | 0 | }; |
121 | 0 | let r = Receiver { |
122 | 0 | flavor: ReceiverFlavor::Array(r), |
123 | 0 | }; |
124 | 0 | (s, r) |
125 | | } |
126 | 0 | } |
127 | | |
128 | | /// Creates a receiver that delivers a message after a certain duration of time. |
129 | | /// |
130 | | /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will |
131 | | /// be sent into the channel after `duration` elapses. The message is the instant at which it is |
132 | | /// sent. |
133 | | /// |
134 | | /// # Examples |
135 | | /// |
136 | | /// Using an `after` channel for timeouts: |
137 | | /// |
138 | | /// ``` |
139 | | /// use std::time::Duration; |
140 | | /// use crossbeam_channel::{after, select, unbounded}; |
141 | | /// |
142 | | /// let (s, r) = unbounded::<i32>(); |
143 | | /// let timeout = Duration::from_millis(100); |
144 | | /// |
145 | | /// select! { |
146 | | /// recv(r) -> msg => println!("received {:?}", msg), |
147 | | /// recv(after(timeout)) -> _ => println!("timed out"), |
148 | | /// } |
149 | | /// ``` |
150 | | /// |
151 | | /// When the message gets sent: |
152 | | /// |
153 | | /// ``` |
154 | | /// use std::thread; |
155 | | /// use std::time::{Duration, Instant}; |
156 | | /// use crossbeam_channel::after; |
157 | | /// |
158 | | /// // Converts a number of milliseconds into a `Duration`. |
159 | | /// let ms = |ms| Duration::from_millis(ms); |
160 | | /// |
161 | | /// // Returns `true` if `a` and `b` are very close `Instant`s. |
162 | | /// let eq = |a, b| a + ms(60) > b && b + ms(60) > a; |
163 | | /// |
164 | | /// let start = Instant::now(); |
165 | | /// let r = after(ms(100)); |
166 | | /// |
167 | | /// thread::sleep(ms(500)); |
168 | | /// |
169 | | /// // This message was sent 100 ms from the start and received 500 ms from the start. |
170 | | /// assert!(eq(r.recv().unwrap(), start + ms(100))); |
171 | | /// assert!(eq(Instant::now(), start + ms(500))); |
172 | | /// ``` |
173 | 0 | pub fn after(duration: Duration) -> Receiver<Instant> { |
174 | 0 | match Instant::now().checked_add(duration) { |
175 | 0 | Some(deadline) => Receiver { |
176 | 0 | flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))), |
177 | 0 | }, |
178 | 0 | None => never(), |
179 | | } |
180 | 0 | } |
181 | | |
182 | | /// Creates a receiver that delivers a message at a certain instant in time. |
183 | | /// |
184 | | /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will |
185 | | /// be sent into the channel at the moment in time `when`. The message is the instant at which it |
186 | | /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered |
187 | | /// instantly to the receiver. |
188 | | /// |
189 | | /// # Examples |
190 | | /// |
191 | | /// Using an `at` channel for timeouts: |
192 | | /// |
193 | | /// ``` |
194 | | /// use std::time::{Instant, Duration}; |
195 | | /// use crossbeam_channel::{at, select, unbounded}; |
196 | | /// |
197 | | /// let (s, r) = unbounded::<i32>(); |
198 | | /// let deadline = Instant::now() + Duration::from_millis(500); |
199 | | /// |
200 | | /// select! { |
201 | | /// recv(r) -> msg => println!("received {:?}", msg), |
202 | | /// recv(at(deadline)) -> _ => println!("timed out"), |
203 | | /// } |
204 | | /// ``` |
205 | | /// |
206 | | /// When the message gets sent: |
207 | | /// |
208 | | /// ``` |
209 | | /// use std::time::{Duration, Instant}; |
210 | | /// use crossbeam_channel::at; |
211 | | /// |
212 | | /// // Converts a number of milliseconds into a `Duration`. |
213 | | /// let ms = |ms| Duration::from_millis(ms); |
214 | | /// |
215 | | /// let start = Instant::now(); |
216 | | /// let end = start + ms(100); |
217 | | /// |
218 | | /// let r = at(end); |
219 | | /// |
220 | | /// // This message was sent 100 ms from the start |
221 | | /// assert_eq!(r.recv().unwrap(), end); |
222 | | /// assert!(Instant::now() > start + ms(100)); |
223 | | /// ``` |
224 | 0 | pub fn at(when: Instant) -> Receiver<Instant> { |
225 | 0 | Receiver { |
226 | 0 | flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))), |
227 | 0 | } |
228 | 0 | } |
229 | | |
230 | | /// Creates a receiver that never delivers messages. |
231 | | /// |
232 | | /// The channel is bounded with capacity of 0 and never gets disconnected. |
233 | | /// |
234 | | /// # Examples |
235 | | /// |
236 | | /// Using a `never` channel to optionally add a timeout to [`select!`]: |
237 | | /// |
238 | | /// [`select!`]: crate::select! |
239 | | /// |
240 | | /// ``` |
241 | | /// use std::thread; |
242 | | /// use std::time::Duration; |
243 | | /// use crossbeam_channel::{after, select, never, unbounded}; |
244 | | /// |
245 | | /// let (s, r) = unbounded(); |
246 | | /// |
247 | | /// thread::spawn(move || { |
248 | | /// thread::sleep(Duration::from_secs(1)); |
249 | | /// s.send(1).unwrap(); |
250 | | /// }); |
251 | | /// |
252 | | /// // Suppose this duration can be a `Some` or a `None`. |
253 | | /// let duration = Some(Duration::from_millis(100)); |
254 | | /// |
255 | | /// // Create a channel that times out after the specified duration. |
256 | | /// let timeout = duration |
257 | | /// .map(|d| after(d)) |
258 | | /// .unwrap_or(never()); |
259 | | /// |
260 | | /// select! { |
261 | | /// recv(r) -> msg => assert_eq!(msg, Ok(1)), |
262 | | /// recv(timeout) -> _ => println!("timed out"), |
263 | | /// } |
264 | | /// ``` |
265 | 0 | pub fn never<T>() -> Receiver<T> { |
266 | 0 | Receiver { |
267 | 0 | flavor: ReceiverFlavor::Never(flavors::never::Channel::new()), |
268 | 0 | } |
269 | 0 | } |
270 | | |
271 | | /// Creates a receiver that delivers messages periodically. |
272 | | /// |
273 | | /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be |
274 | | /// sent into the channel in intervals of `duration`. Each message is the instant at which it is |
275 | | /// sent. |
276 | | /// |
277 | | /// # Examples |
278 | | /// |
279 | | /// Using a `tick` channel to periodically print elapsed time: |
280 | | /// |
281 | | /// ``` |
282 | | /// use std::time::{Duration, Instant}; |
283 | | /// use crossbeam_channel::tick; |
284 | | /// |
285 | | /// let start = Instant::now(); |
286 | | /// let ticker = tick(Duration::from_millis(100)); |
287 | | /// |
288 | | /// for _ in 0..5 { |
289 | | /// ticker.recv().unwrap(); |
290 | | /// println!("elapsed: {:?}", start.elapsed()); |
291 | | /// } |
292 | | /// ``` |
293 | | /// |
294 | | /// When messages get sent: |
295 | | /// |
296 | | /// ``` |
297 | | /// use std::thread; |
298 | | /// use std::time::{Duration, Instant}; |
299 | | /// use crossbeam_channel::tick; |
300 | | /// |
301 | | /// // Converts a number of milliseconds into a `Duration`. |
302 | | /// let ms = |ms| Duration::from_millis(ms); |
303 | | /// |
304 | | /// // Returns `true` if `a` and `b` are very close `Instant`s. |
305 | | /// let eq = |a, b| a + ms(65) > b && b + ms(65) > a; |
306 | | /// |
307 | | /// let start = Instant::now(); |
308 | | /// let r = tick(ms(100)); |
309 | | /// |
310 | | /// // This message was sent 100 ms from the start and received 100 ms from the start. |
311 | | /// assert!(eq(r.recv().unwrap(), start + ms(100))); |
312 | | /// assert!(eq(Instant::now(), start + ms(100))); |
313 | | /// |
314 | | /// thread::sleep(ms(500)); |
315 | | /// |
316 | | /// // This message was sent 200 ms from the start and received 600 ms from the start. |
317 | | /// assert!(eq(r.recv().unwrap(), start + ms(200))); |
318 | | /// assert!(eq(Instant::now(), start + ms(600))); |
319 | | /// |
320 | | /// // This message was sent 700 ms from the start and received 700 ms from the start. |
321 | | /// assert!(eq(r.recv().unwrap(), start + ms(700))); |
322 | | /// assert!(eq(Instant::now(), start + ms(700))); |
323 | | /// ``` |
324 | 0 | pub fn tick(duration: Duration) -> Receiver<Instant> { |
325 | 0 | match Instant::now().checked_add(duration) { |
326 | 0 | Some(delivery_time) => Receiver { |
327 | 0 | flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new( |
328 | 0 | delivery_time, |
329 | 0 | duration, |
330 | 0 | ))), |
331 | 0 | }, |
332 | 0 | None => never(), |
333 | | } |
334 | 0 | } |
335 | | |
336 | | /// The sending side of a channel. |
337 | | /// |
338 | | /// # Examples |
339 | | /// |
340 | | /// ``` |
341 | | /// use std::thread; |
342 | | /// use crossbeam_channel::unbounded; |
343 | | /// |
344 | | /// let (s1, r) = unbounded(); |
345 | | /// let s2 = s1.clone(); |
346 | | /// |
347 | | /// thread::spawn(move || s1.send(1).unwrap()); |
348 | | /// thread::spawn(move || s2.send(2).unwrap()); |
349 | | /// |
350 | | /// let msg1 = r.recv().unwrap(); |
351 | | /// let msg2 = r.recv().unwrap(); |
352 | | /// |
353 | | /// assert_eq!(msg1 + msg2, 3); |
354 | | /// ``` |
355 | | pub struct Sender<T> { |
356 | | flavor: SenderFlavor<T>, |
357 | | } |
358 | | |
359 | | /// Sender flavors. |
360 | | enum SenderFlavor<T> { |
361 | | /// Bounded channel based on a preallocated array. |
362 | | Array(counter::Sender<flavors::array::Channel<T>>), |
363 | | |
364 | | /// Unbounded channel implemented as a linked list. |
365 | | List(counter::Sender<flavors::list::Channel<T>>), |
366 | | |
367 | | /// Zero-capacity channel. |
368 | | Zero(counter::Sender<flavors::zero::Channel<T>>), |
369 | | } |
370 | | |
371 | | unsafe impl<T: Send> Send for Sender<T> {} |
372 | | unsafe impl<T: Send> Sync for Sender<T> {} |
373 | | |
374 | | impl<T> UnwindSafe for Sender<T> {} |
375 | | impl<T> RefUnwindSafe for Sender<T> {} |
376 | | |
377 | | impl<T> Sender<T> { |
378 | | /// Attempts to send a message into the channel without blocking. |
379 | | /// |
380 | | /// This method will either send a message into the channel immediately or return an error if |
381 | | /// the channel is full or disconnected. The returned error contains the original message. |
382 | | /// |
383 | | /// If called on a zero-capacity channel, this method will send the message only if there |
384 | | /// happens to be a receive operation on the other side of the channel at the same time. |
385 | | /// |
386 | | /// # Examples |
387 | | /// |
388 | | /// ``` |
389 | | /// use crossbeam_channel::{bounded, TrySendError}; |
390 | | /// |
391 | | /// let (s, r) = bounded(1); |
392 | | /// |
393 | | /// assert_eq!(s.try_send(1), Ok(())); |
394 | | /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); |
395 | | /// |
396 | | /// drop(r); |
397 | | /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3))); |
398 | | /// ``` |
399 | 0 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
400 | 0 | match &self.flavor { |
401 | 0 | SenderFlavor::Array(chan) => chan.try_send(msg), |
402 | 0 | SenderFlavor::List(chan) => chan.try_send(msg), |
403 | 0 | SenderFlavor::Zero(chan) => chan.try_send(msg), |
404 | | } |
405 | 0 | } |
406 | | |
407 | | /// Blocks the current thread until a message is sent or the channel is disconnected. |
408 | | /// |
409 | | /// If the channel is full and not disconnected, this call will block until the send operation |
410 | | /// can proceed. If the channel becomes disconnected, this call will wake up and return an |
411 | | /// error. The returned error contains the original message. |
412 | | /// |
413 | | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
414 | | /// appear on the other side of the channel. |
415 | | /// |
416 | | /// # Examples |
417 | | /// |
418 | | /// ``` |
419 | | /// use std::thread; |
420 | | /// use std::time::Duration; |
421 | | /// use crossbeam_channel::{bounded, SendError}; |
422 | | /// |
423 | | /// let (s, r) = bounded(1); |
424 | | /// assert_eq!(s.send(1), Ok(())); |
425 | | /// |
426 | | /// thread::spawn(move || { |
427 | | /// assert_eq!(r.recv(), Ok(1)); |
428 | | /// thread::sleep(Duration::from_secs(1)); |
429 | | /// drop(r); |
430 | | /// }); |
431 | | /// |
432 | | /// assert_eq!(s.send(2), Ok(())); |
433 | | /// assert_eq!(s.send(3), Err(SendError(3))); |
434 | | /// ``` |
435 | 0 | pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
436 | 0 | match &self.flavor { |
437 | 0 | SenderFlavor::Array(chan) => chan.send(msg, None), |
438 | 0 | SenderFlavor::List(chan) => chan.send(msg, None), |
439 | 0 | SenderFlavor::Zero(chan) => chan.send(msg, None), |
440 | | } |
441 | 0 | .map_err(|err| match err { |
442 | 0 | SendTimeoutError::Disconnected(msg) => SendError(msg), |
443 | 0 | SendTimeoutError::Timeout(_) => unreachable!(), |
444 | 0 | }) |
445 | 0 | } |
446 | | |
447 | | /// Waits for a message to be sent into the channel, but only for a limited time. |
448 | | /// |
449 | | /// If the channel is full and not disconnected, this call will block until the send operation |
450 | | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
451 | | /// wake up and return an error. The returned error contains the original message. |
452 | | /// |
453 | | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
454 | | /// appear on the other side of the channel. |
455 | | /// |
456 | | /// # Examples |
457 | | /// |
458 | | /// ``` |
459 | | /// use std::thread; |
460 | | /// use std::time::Duration; |
461 | | /// use crossbeam_channel::{bounded, SendTimeoutError}; |
462 | | /// |
463 | | /// let (s, r) = bounded(0); |
464 | | /// |
465 | | /// thread::spawn(move || { |
466 | | /// thread::sleep(Duration::from_secs(1)); |
467 | | /// assert_eq!(r.recv(), Ok(2)); |
468 | | /// drop(r); |
469 | | /// }); |
470 | | /// |
471 | | /// assert_eq!( |
472 | | /// s.send_timeout(1, Duration::from_millis(500)), |
473 | | /// Err(SendTimeoutError::Timeout(1)), |
474 | | /// ); |
475 | | /// assert_eq!( |
476 | | /// s.send_timeout(2, Duration::from_secs(1)), |
477 | | /// Ok(()), |
478 | | /// ); |
479 | | /// assert_eq!( |
480 | | /// s.send_timeout(3, Duration::from_millis(500)), |
481 | | /// Err(SendTimeoutError::Disconnected(3)), |
482 | | /// ); |
483 | | /// ``` |
484 | 0 | pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { |
485 | 0 | match Instant::now().checked_add(timeout) { |
486 | 0 | Some(deadline) => self.send_deadline(msg, deadline), |
487 | 0 | None => self.send(msg).map_err(SendTimeoutError::from), |
488 | | } |
489 | 0 | } |
490 | | |
491 | | /// Waits for a message to be sent into the channel, but only until a given deadline. |
492 | | /// |
493 | | /// If the channel is full and not disconnected, this call will block until the send operation |
494 | | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
495 | | /// wake up and return an error. The returned error contains the original message. |
496 | | /// |
497 | | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
498 | | /// appear on the other side of the channel. |
499 | | /// |
500 | | /// # Examples |
501 | | /// |
502 | | /// ``` |
503 | | /// use std::thread; |
504 | | /// use std::time::{Duration, Instant}; |
505 | | /// use crossbeam_channel::{bounded, SendTimeoutError}; |
506 | | /// |
507 | | /// let (s, r) = bounded(0); |
508 | | /// |
509 | | /// thread::spawn(move || { |
510 | | /// thread::sleep(Duration::from_secs(1)); |
511 | | /// assert_eq!(r.recv(), Ok(2)); |
512 | | /// drop(r); |
513 | | /// }); |
514 | | /// |
515 | | /// let now = Instant::now(); |
516 | | /// |
517 | | /// assert_eq!( |
518 | | /// s.send_deadline(1, now + Duration::from_millis(500)), |
519 | | /// Err(SendTimeoutError::Timeout(1)), |
520 | | /// ); |
521 | | /// assert_eq!( |
522 | | /// s.send_deadline(2, now + Duration::from_millis(1500)), |
523 | | /// Ok(()), |
524 | | /// ); |
525 | | /// assert_eq!( |
526 | | /// s.send_deadline(3, now + Duration::from_millis(2000)), |
527 | | /// Err(SendTimeoutError::Disconnected(3)), |
528 | | /// ); |
529 | | /// ``` |
530 | 0 | pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
531 | 0 | match &self.flavor { |
532 | 0 | SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), |
533 | 0 | SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), |
534 | 0 | SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), |
535 | | } |
536 | 0 | } |
537 | | |
538 | | /// Returns `true` if the channel is empty. |
539 | | /// |
540 | | /// Note: Zero-capacity channels are always empty. |
541 | | /// |
542 | | /// # Examples |
543 | | /// |
544 | | /// ``` |
545 | | /// use crossbeam_channel::unbounded; |
546 | | /// |
547 | | /// let (s, r) = unbounded(); |
548 | | /// assert!(s.is_empty()); |
549 | | /// |
550 | | /// s.send(0).unwrap(); |
551 | | /// assert!(!s.is_empty()); |
552 | | /// ``` |
553 | 0 | pub fn is_empty(&self) -> bool { |
554 | 0 | match &self.flavor { |
555 | 0 | SenderFlavor::Array(chan) => chan.is_empty(), |
556 | 0 | SenderFlavor::List(chan) => chan.is_empty(), |
557 | 0 | SenderFlavor::Zero(chan) => chan.is_empty(), |
558 | | } |
559 | 0 | } |
560 | | |
561 | | /// Returns `true` if the channel is full. |
562 | | /// |
563 | | /// Note: Zero-capacity channels are always full. |
564 | | /// |
565 | | /// # Examples |
566 | | /// |
567 | | /// ``` |
568 | | /// use crossbeam_channel::bounded; |
569 | | /// |
570 | | /// let (s, r) = bounded(1); |
571 | | /// |
572 | | /// assert!(!s.is_full()); |
573 | | /// s.send(0).unwrap(); |
574 | | /// assert!(s.is_full()); |
575 | | /// ``` |
576 | 0 | pub fn is_full(&self) -> bool { |
577 | 0 | match &self.flavor { |
578 | 0 | SenderFlavor::Array(chan) => chan.is_full(), |
579 | 0 | SenderFlavor::List(chan) => chan.is_full(), |
580 | 0 | SenderFlavor::Zero(chan) => chan.is_full(), |
581 | | } |
582 | 0 | } |
583 | | |
584 | | /// Returns the number of messages in the channel. |
585 | | /// |
586 | | /// # Examples |
587 | | /// |
588 | | /// ``` |
589 | | /// use crossbeam_channel::unbounded; |
590 | | /// |
591 | | /// let (s, r) = unbounded(); |
592 | | /// assert_eq!(s.len(), 0); |
593 | | /// |
594 | | /// s.send(1).unwrap(); |
595 | | /// s.send(2).unwrap(); |
596 | | /// assert_eq!(s.len(), 2); |
597 | | /// ``` |
598 | 0 | pub fn len(&self) -> usize { |
599 | 0 | match &self.flavor { |
600 | 0 | SenderFlavor::Array(chan) => chan.len(), |
601 | 0 | SenderFlavor::List(chan) => chan.len(), |
602 | 0 | SenderFlavor::Zero(chan) => chan.len(), |
603 | | } |
604 | 0 | } |
605 | | |
606 | | /// If the channel is bounded, returns its capacity. |
607 | | /// |
608 | | /// # Examples |
609 | | /// |
610 | | /// ``` |
611 | | /// use crossbeam_channel::{bounded, unbounded}; |
612 | | /// |
613 | | /// let (s, _) = unbounded::<i32>(); |
614 | | /// assert_eq!(s.capacity(), None); |
615 | | /// |
616 | | /// let (s, _) = bounded::<i32>(5); |
617 | | /// assert_eq!(s.capacity(), Some(5)); |
618 | | /// |
619 | | /// let (s, _) = bounded::<i32>(0); |
620 | | /// assert_eq!(s.capacity(), Some(0)); |
621 | | /// ``` |
622 | 0 | pub fn capacity(&self) -> Option<usize> { |
623 | 0 | match &self.flavor { |
624 | 0 | SenderFlavor::Array(chan) => chan.capacity(), |
625 | 0 | SenderFlavor::List(chan) => chan.capacity(), |
626 | 0 | SenderFlavor::Zero(chan) => chan.capacity(), |
627 | | } |
628 | 0 | } |
629 | | |
630 | | /// Returns `true` if senders belong to the same channel. |
631 | | /// |
632 | | /// # Examples |
633 | | /// |
634 | | /// ```rust |
635 | | /// use crossbeam_channel::unbounded; |
636 | | /// |
637 | | /// let (s, _) = unbounded::<usize>(); |
638 | | /// |
639 | | /// let s2 = s.clone(); |
640 | | /// assert!(s.same_channel(&s2)); |
641 | | /// |
642 | | /// let (s3, _) = unbounded(); |
643 | | /// assert!(!s.same_channel(&s3)); |
644 | | /// ``` |
645 | 0 | pub fn same_channel(&self, other: &Sender<T>) -> bool { |
646 | 0 | match (&self.flavor, &other.flavor) { |
647 | 0 | (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, |
648 | 0 | (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, |
649 | 0 | (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, |
650 | 0 | _ => false, |
651 | | } |
652 | 0 | } |
653 | | } |
654 | | |
655 | | impl<T> Drop for Sender<T> { |
656 | 0 | fn drop(&mut self) { |
657 | | unsafe { |
658 | 0 | match &self.flavor { |
659 | 0 | SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), |
660 | 0 | SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), |
661 | 0 | SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), |
662 | | } |
663 | | } |
664 | 0 | } |
665 | | } |
666 | | |
667 | | impl<T> Clone for Sender<T> { |
668 | 0 | fn clone(&self) -> Self { |
669 | 0 | let flavor = match &self.flavor { |
670 | 0 | SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), |
671 | 0 | SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), |
672 | 0 | SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), |
673 | | }; |
674 | | |
675 | 0 | Sender { flavor } |
676 | 0 | } |
677 | | } |
678 | | |
679 | | impl<T> fmt::Debug for Sender<T> { |
680 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
681 | 0 | f.pad("Sender { .. }") |
682 | 0 | } |
683 | | } |
684 | | |
685 | | /// The receiving side of a channel. |
686 | | /// |
687 | | /// # Examples |
688 | | /// |
689 | | /// ``` |
690 | | /// use std::thread; |
691 | | /// use std::time::Duration; |
692 | | /// use crossbeam_channel::unbounded; |
693 | | /// |
694 | | /// let (s, r) = unbounded(); |
695 | | /// |
696 | | /// thread::spawn(move || { |
697 | | /// let _ = s.send(1); |
698 | | /// thread::sleep(Duration::from_secs(1)); |
699 | | /// let _ = s.send(2); |
700 | | /// }); |
701 | | /// |
702 | | /// assert_eq!(r.recv(), Ok(1)); // Received immediately. |
703 | | /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second. |
704 | | /// ``` |
705 | | pub struct Receiver<T> { |
706 | | flavor: ReceiverFlavor<T>, |
707 | | } |
708 | | |
709 | | /// Receiver flavors. |
710 | | enum ReceiverFlavor<T> { |
711 | | /// Bounded channel based on a preallocated array. |
712 | | Array(counter::Receiver<flavors::array::Channel<T>>), |
713 | | |
714 | | /// Unbounded channel implemented as a linked list. |
715 | | List(counter::Receiver<flavors::list::Channel<T>>), |
716 | | |
717 | | /// Zero-capacity channel. |
718 | | Zero(counter::Receiver<flavors::zero::Channel<T>>), |
719 | | |
720 | | /// The after flavor. |
721 | | At(Arc<flavors::at::Channel>), |
722 | | |
723 | | /// The tick flavor. |
724 | | Tick(Arc<flavors::tick::Channel>), |
725 | | |
726 | | /// The never flavor. |
727 | | Never(flavors::never::Channel<T>), |
728 | | } |
729 | | |
730 | | unsafe impl<T: Send> Send for Receiver<T> {} |
731 | | unsafe impl<T: Send> Sync for Receiver<T> {} |
732 | | |
733 | | impl<T> UnwindSafe for Receiver<T> {} |
734 | | impl<T> RefUnwindSafe for Receiver<T> {} |
735 | | |
736 | | impl<T> Receiver<T> { |
737 | | /// Attempts to receive a message from the channel without blocking. |
738 | | /// |
739 | | /// This method will either receive a message from the channel immediately or return an error |
740 | | /// if the channel is empty. |
741 | | /// |
742 | | /// If called on a zero-capacity channel, this method will receive a message only if there |
743 | | /// happens to be a send operation on the other side of the channel at the same time. |
744 | | /// |
745 | | /// # Examples |
746 | | /// |
747 | | /// ``` |
748 | | /// use crossbeam_channel::{unbounded, TryRecvError}; |
749 | | /// |
750 | | /// let (s, r) = unbounded(); |
751 | | /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); |
752 | | /// |
753 | | /// s.send(5).unwrap(); |
754 | | /// drop(s); |
755 | | /// |
756 | | /// assert_eq!(r.try_recv(), Ok(5)); |
757 | | /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); |
758 | | /// ``` |
759 | 0 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
760 | 0 | match &self.flavor { |
761 | 0 | ReceiverFlavor::Array(chan) => chan.try_recv(), |
762 | 0 | ReceiverFlavor::List(chan) => chan.try_recv(), |
763 | 0 | ReceiverFlavor::Zero(chan) => chan.try_recv(), |
764 | 0 | ReceiverFlavor::At(chan) => { |
765 | 0 | let msg = chan.try_recv(); |
766 | | unsafe { |
767 | 0 | mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( |
768 | 0 | &msg, |
769 | | ) |
770 | | } |
771 | | } |
772 | 0 | ReceiverFlavor::Tick(chan) => { |
773 | 0 | let msg = chan.try_recv(); |
774 | | unsafe { |
775 | 0 | mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( |
776 | 0 | &msg, |
777 | | ) |
778 | | } |
779 | | } |
780 | 0 | ReceiverFlavor::Never(chan) => chan.try_recv(), |
781 | | } |
782 | 0 | } |
783 | | |
784 | | /// Blocks the current thread until a message is received or the channel is empty and |
785 | | /// disconnected. |
786 | | /// |
787 | | /// If the channel is empty and not disconnected, this call will block until the receive |
788 | | /// operation can proceed. If the channel is empty and becomes disconnected, this call will |
789 | | /// wake up and return an error. |
790 | | /// |
791 | | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
792 | | /// on the other side of the channel. |
793 | | /// |
794 | | /// # Examples |
795 | | /// |
796 | | /// ``` |
797 | | /// use std::thread; |
798 | | /// use std::time::Duration; |
799 | | /// use crossbeam_channel::{unbounded, RecvError}; |
800 | | /// |
801 | | /// let (s, r) = unbounded(); |
802 | | /// |
803 | | /// thread::spawn(move || { |
804 | | /// thread::sleep(Duration::from_secs(1)); |
805 | | /// s.send(5).unwrap(); |
806 | | /// drop(s); |
807 | | /// }); |
808 | | /// |
809 | | /// assert_eq!(r.recv(), Ok(5)); |
810 | | /// assert_eq!(r.recv(), Err(RecvError)); |
811 | | /// ``` |
812 | 0 | pub fn recv(&self) -> Result<T, RecvError> { |
813 | 0 | match &self.flavor { |
814 | 0 | ReceiverFlavor::Array(chan) => chan.recv(None), |
815 | 0 | ReceiverFlavor::List(chan) => chan.recv(None), |
816 | 0 | ReceiverFlavor::Zero(chan) => chan.recv(None), |
817 | 0 | ReceiverFlavor::At(chan) => { |
818 | 0 | let msg = chan.recv(None); |
819 | | unsafe { |
820 | 0 | mem::transmute_copy::< |
821 | 0 | Result<Instant, RecvTimeoutError>, |
822 | 0 | Result<T, RecvTimeoutError>, |
823 | 0 | >(&msg) |
824 | | } |
825 | | } |
826 | 0 | ReceiverFlavor::Tick(chan) => { |
827 | 0 | let msg = chan.recv(None); |
828 | | unsafe { |
829 | 0 | mem::transmute_copy::< |
830 | 0 | Result<Instant, RecvTimeoutError>, |
831 | 0 | Result<T, RecvTimeoutError>, |
832 | 0 | >(&msg) |
833 | | } |
834 | | } |
835 | 0 | ReceiverFlavor::Never(chan) => chan.recv(None), |
836 | | } |
837 | 0 | .map_err(|_| RecvError) |
838 | 0 | } |
839 | | |
840 | | /// Waits for a message to be received from the channel, but only for a limited time. |
841 | | /// |
842 | | /// If the channel is empty and not disconnected, this call will block until the receive |
843 | | /// operation can proceed or the operation times out. If the channel is empty and becomes |
844 | | /// disconnected, this call will wake up and return an error. |
845 | | /// |
846 | | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
847 | | /// on the other side of the channel. |
848 | | /// |
849 | | /// # Examples |
850 | | /// |
851 | | /// ``` |
852 | | /// use std::thread; |
853 | | /// use std::time::Duration; |
854 | | /// use crossbeam_channel::{unbounded, RecvTimeoutError}; |
855 | | /// |
856 | | /// let (s, r) = unbounded(); |
857 | | /// |
858 | | /// thread::spawn(move || { |
859 | | /// thread::sleep(Duration::from_secs(1)); |
860 | | /// s.send(5).unwrap(); |
861 | | /// drop(s); |
862 | | /// }); |
863 | | /// |
864 | | /// assert_eq!( |
865 | | /// r.recv_timeout(Duration::from_millis(500)), |
866 | | /// Err(RecvTimeoutError::Timeout), |
867 | | /// ); |
868 | | /// assert_eq!( |
869 | | /// r.recv_timeout(Duration::from_secs(1)), |
870 | | /// Ok(5), |
871 | | /// ); |
872 | | /// assert_eq!( |
873 | | /// r.recv_timeout(Duration::from_secs(1)), |
874 | | /// Err(RecvTimeoutError::Disconnected), |
875 | | /// ); |
876 | | /// ``` |
877 | 0 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
878 | 0 | match Instant::now().checked_add(timeout) { |
879 | 0 | Some(deadline) => self.recv_deadline(deadline), |
880 | 0 | None => self.recv().map_err(RecvTimeoutError::from), |
881 | | } |
882 | 0 | } |
883 | | |
884 | | /// Waits for a message to be received from the channel, but only before a given deadline. |
885 | | /// |
886 | | /// If the channel is empty and not disconnected, this call will block until the receive |
887 | | /// operation can proceed or the operation times out. If the channel is empty and becomes |
888 | | /// disconnected, this call will wake up and return an error. |
889 | | /// |
890 | | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
891 | | /// on the other side of the channel. |
892 | | /// |
893 | | /// # Examples |
894 | | /// |
895 | | /// ``` |
896 | | /// use std::thread; |
897 | | /// use std::time::{Instant, Duration}; |
898 | | /// use crossbeam_channel::{unbounded, RecvTimeoutError}; |
899 | | /// |
900 | | /// let (s, r) = unbounded(); |
901 | | /// |
902 | | /// thread::spawn(move || { |
903 | | /// thread::sleep(Duration::from_secs(1)); |
904 | | /// s.send(5).unwrap(); |
905 | | /// drop(s); |
906 | | /// }); |
907 | | /// |
908 | | /// let now = Instant::now(); |
909 | | /// |
910 | | /// assert_eq!( |
911 | | /// r.recv_deadline(now + Duration::from_millis(500)), |
912 | | /// Err(RecvTimeoutError::Timeout), |
913 | | /// ); |
914 | | /// assert_eq!( |
915 | | /// r.recv_deadline(now + Duration::from_millis(1500)), |
916 | | /// Ok(5), |
917 | | /// ); |
918 | | /// assert_eq!( |
919 | | /// r.recv_deadline(now + Duration::from_secs(5)), |
920 | | /// Err(RecvTimeoutError::Disconnected), |
921 | | /// ); |
922 | | /// ``` |
923 | 0 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
924 | 0 | match &self.flavor { |
925 | 0 | ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), |
926 | 0 | ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), |
927 | 0 | ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), |
928 | 0 | ReceiverFlavor::At(chan) => { |
929 | 0 | let msg = chan.recv(Some(deadline)); |
930 | | unsafe { |
931 | 0 | mem::transmute_copy::< |
932 | 0 | Result<Instant, RecvTimeoutError>, |
933 | 0 | Result<T, RecvTimeoutError>, |
934 | 0 | >(&msg) |
935 | | } |
936 | | } |
937 | 0 | ReceiverFlavor::Tick(chan) => { |
938 | 0 | let msg = chan.recv(Some(deadline)); |
939 | | unsafe { |
940 | 0 | mem::transmute_copy::< |
941 | 0 | Result<Instant, RecvTimeoutError>, |
942 | 0 | Result<T, RecvTimeoutError>, |
943 | 0 | >(&msg) |
944 | | } |
945 | | } |
946 | 0 | ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)), |
947 | | } |
948 | 0 | } |
949 | | |
950 | | /// Returns `true` if the channel is empty. |
951 | | /// |
952 | | /// Note: Zero-capacity channels are always empty. |
953 | | /// |
954 | | /// # Examples |
955 | | /// |
956 | | /// ``` |
957 | | /// use crossbeam_channel::unbounded; |
958 | | /// |
959 | | /// let (s, r) = unbounded(); |
960 | | /// |
961 | | /// assert!(r.is_empty()); |
962 | | /// s.send(0).unwrap(); |
963 | | /// assert!(!r.is_empty()); |
964 | | /// ``` |
965 | 0 | pub fn is_empty(&self) -> bool { |
966 | 0 | match &self.flavor { |
967 | 0 | ReceiverFlavor::Array(chan) => chan.is_empty(), |
968 | 0 | ReceiverFlavor::List(chan) => chan.is_empty(), |
969 | 0 | ReceiverFlavor::Zero(chan) => chan.is_empty(), |
970 | 0 | ReceiverFlavor::At(chan) => chan.is_empty(), |
971 | 0 | ReceiverFlavor::Tick(chan) => chan.is_empty(), |
972 | 0 | ReceiverFlavor::Never(chan) => chan.is_empty(), |
973 | | } |
974 | 0 | } |
975 | | |
976 | | /// Returns `true` if the channel is full. |
977 | | /// |
978 | | /// Note: Zero-capacity channels are always full. |
979 | | /// |
980 | | /// # Examples |
981 | | /// |
982 | | /// ``` |
983 | | /// use crossbeam_channel::bounded; |
984 | | /// |
985 | | /// let (s, r) = bounded(1); |
986 | | /// |
987 | | /// assert!(!r.is_full()); |
988 | | /// s.send(0).unwrap(); |
989 | | /// assert!(r.is_full()); |
990 | | /// ``` |
991 | 0 | pub fn is_full(&self) -> bool { |
992 | 0 | match &self.flavor { |
993 | 0 | ReceiverFlavor::Array(chan) => chan.is_full(), |
994 | 0 | ReceiverFlavor::List(chan) => chan.is_full(), |
995 | 0 | ReceiverFlavor::Zero(chan) => chan.is_full(), |
996 | 0 | ReceiverFlavor::At(chan) => chan.is_full(), |
997 | 0 | ReceiverFlavor::Tick(chan) => chan.is_full(), |
998 | 0 | ReceiverFlavor::Never(chan) => chan.is_full(), |
999 | | } |
1000 | 0 | } |
1001 | | |
1002 | | /// Returns the number of messages in the channel. |
1003 | | /// |
1004 | | /// # Examples |
1005 | | /// |
1006 | | /// ``` |
1007 | | /// use crossbeam_channel::unbounded; |
1008 | | /// |
1009 | | /// let (s, r) = unbounded(); |
1010 | | /// assert_eq!(r.len(), 0); |
1011 | | /// |
1012 | | /// s.send(1).unwrap(); |
1013 | | /// s.send(2).unwrap(); |
1014 | | /// assert_eq!(r.len(), 2); |
1015 | | /// ``` |
1016 | 0 | pub fn len(&self) -> usize { |
1017 | 0 | match &self.flavor { |
1018 | 0 | ReceiverFlavor::Array(chan) => chan.len(), |
1019 | 0 | ReceiverFlavor::List(chan) => chan.len(), |
1020 | 0 | ReceiverFlavor::Zero(chan) => chan.len(), |
1021 | 0 | ReceiverFlavor::At(chan) => chan.len(), |
1022 | 0 | ReceiverFlavor::Tick(chan) => chan.len(), |
1023 | 0 | ReceiverFlavor::Never(chan) => chan.len(), |
1024 | | } |
1025 | 0 | } |
1026 | | |
1027 | | /// If the channel is bounded, returns its capacity. |
1028 | | /// |
1029 | | /// # Examples |
1030 | | /// |
1031 | | /// ``` |
1032 | | /// use crossbeam_channel::{bounded, unbounded}; |
1033 | | /// |
1034 | | /// let (_, r) = unbounded::<i32>(); |
1035 | | /// assert_eq!(r.capacity(), None); |
1036 | | /// |
1037 | | /// let (_, r) = bounded::<i32>(5); |
1038 | | /// assert_eq!(r.capacity(), Some(5)); |
1039 | | /// |
1040 | | /// let (_, r) = bounded::<i32>(0); |
1041 | | /// assert_eq!(r.capacity(), Some(0)); |
1042 | | /// ``` |
1043 | 0 | pub fn capacity(&self) -> Option<usize> { |
1044 | 0 | match &self.flavor { |
1045 | 0 | ReceiverFlavor::Array(chan) => chan.capacity(), |
1046 | 0 | ReceiverFlavor::List(chan) => chan.capacity(), |
1047 | 0 | ReceiverFlavor::Zero(chan) => chan.capacity(), |
1048 | 0 | ReceiverFlavor::At(chan) => chan.capacity(), |
1049 | 0 | ReceiverFlavor::Tick(chan) => chan.capacity(), |
1050 | 0 | ReceiverFlavor::Never(chan) => chan.capacity(), |
1051 | | } |
1052 | 0 | } |
1053 | | |
1054 | | /// A blocking iterator over messages in the channel. |
1055 | | /// |
1056 | | /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if |
1057 | | /// the channel becomes empty and disconnected, it returns [`None`] without blocking. |
1058 | | /// |
1059 | | /// [`next`]: Iterator::next |
1060 | | /// |
1061 | | /// # Examples |
1062 | | /// |
1063 | | /// ``` |
1064 | | /// use std::thread; |
1065 | | /// use crossbeam_channel::unbounded; |
1066 | | /// |
1067 | | /// let (s, r) = unbounded(); |
1068 | | /// |
1069 | | /// thread::spawn(move || { |
1070 | | /// s.send(1).unwrap(); |
1071 | | /// s.send(2).unwrap(); |
1072 | | /// s.send(3).unwrap(); |
1073 | | /// drop(s); // Disconnect the channel. |
1074 | | /// }); |
1075 | | /// |
1076 | | /// // Collect all messages from the channel. |
1077 | | /// // Note that the call to `collect` blocks until the sender is dropped. |
1078 | | /// let v: Vec<_> = r.iter().collect(); |
1079 | | /// |
1080 | | /// assert_eq!(v, [1, 2, 3]); |
1081 | | /// ``` |
1082 | 0 | pub fn iter(&self) -> Iter<'_, T> { |
1083 | 0 | Iter { receiver: self } |
1084 | 0 | } |
1085 | | |
1086 | | /// A non-blocking iterator over messages in the channel. |
1087 | | /// |
1088 | | /// Each call to [`next`] returns a message if there is one ready to be received. The iterator |
1089 | | /// never blocks waiting for the next message. |
1090 | | /// |
1091 | | /// [`next`]: Iterator::next |
1092 | | /// |
1093 | | /// # Examples |
1094 | | /// |
1095 | | /// ``` |
1096 | | /// use std::thread; |
1097 | | /// use std::time::Duration; |
1098 | | /// use crossbeam_channel::unbounded; |
1099 | | /// |
1100 | | /// let (s, r) = unbounded::<i32>(); |
1101 | | /// |
1102 | | /// thread::spawn(move || { |
1103 | | /// s.send(1).unwrap(); |
1104 | | /// thread::sleep(Duration::from_secs(1)); |
1105 | | /// s.send(2).unwrap(); |
1106 | | /// thread::sleep(Duration::from_secs(2)); |
1107 | | /// s.send(3).unwrap(); |
1108 | | /// }); |
1109 | | /// |
1110 | | /// thread::sleep(Duration::from_secs(2)); |
1111 | | /// |
1112 | | /// // Collect all messages from the channel without blocking. |
1113 | | /// // The third message hasn't been sent yet so we'll collect only the first two. |
1114 | | /// let v: Vec<_> = r.try_iter().collect(); |
1115 | | /// |
1116 | | /// assert_eq!(v, [1, 2]); |
1117 | | /// ``` |
1118 | 0 | pub fn try_iter(&self) -> TryIter<'_, T> { |
1119 | 0 | TryIter { receiver: self } |
1120 | 0 | } |
1121 | | |
1122 | | /// Returns `true` if receivers belong to the same channel. |
1123 | | /// |
1124 | | /// # Examples |
1125 | | /// |
1126 | | /// ```rust |
1127 | | /// use crossbeam_channel::unbounded; |
1128 | | /// |
1129 | | /// let (_, r) = unbounded::<usize>(); |
1130 | | /// |
1131 | | /// let r2 = r.clone(); |
1132 | | /// assert!(r.same_channel(&r2)); |
1133 | | /// |
1134 | | /// let (_, r3) = unbounded(); |
1135 | | /// assert!(!r.same_channel(&r3)); |
1136 | | /// ``` |
1137 | 0 | pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
1138 | 0 | match (&self.flavor, &other.flavor) { |
1139 | 0 | (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, |
1140 | 0 | (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, |
1141 | 0 | (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, |
1142 | 0 | (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b), |
1143 | 0 | (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b), |
1144 | 0 | (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true, |
1145 | 0 | _ => false, |
1146 | | } |
1147 | 0 | } |
1148 | | } |
1149 | | |
1150 | | impl<T> Drop for Receiver<T> { |
1151 | 0 | fn drop(&mut self) { |
1152 | | unsafe { |
1153 | 0 | match &self.flavor { |
1154 | 0 | ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), |
1155 | 0 | ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), |
1156 | 0 | ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), |
1157 | 0 | ReceiverFlavor::At(_) => {} |
1158 | 0 | ReceiverFlavor::Tick(_) => {} |
1159 | 0 | ReceiverFlavor::Never(_) => {} |
1160 | | } |
1161 | | } |
1162 | 0 | } |
1163 | | } |
1164 | | |
1165 | | impl<T> Clone for Receiver<T> { |
1166 | 0 | fn clone(&self) -> Self { |
1167 | 0 | let flavor = match &self.flavor { |
1168 | 0 | ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), |
1169 | 0 | ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), |
1170 | 0 | ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), |
1171 | 0 | ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()), |
1172 | 0 | ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()), |
1173 | 0 | ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()), |
1174 | | }; |
1175 | | |
1176 | 0 | Receiver { flavor } |
1177 | 0 | } |
1178 | | } |
1179 | | |
1180 | | impl<T> fmt::Debug for Receiver<T> { |
1181 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1182 | 0 | f.pad("Receiver { .. }") |
1183 | 0 | } |
1184 | | } |
1185 | | |
1186 | | impl<'a, T> IntoIterator for &'a Receiver<T> { |
1187 | | type Item = T; |
1188 | | type IntoIter = Iter<'a, T>; |
1189 | | |
1190 | 0 | fn into_iter(self) -> Self::IntoIter { |
1191 | 0 | self.iter() |
1192 | 0 | } |
1193 | | } |
1194 | | |
1195 | | impl<T> IntoIterator for Receiver<T> { |
1196 | | type Item = T; |
1197 | | type IntoIter = IntoIter<T>; |
1198 | | |
1199 | 0 | fn into_iter(self) -> Self::IntoIter { |
1200 | 0 | IntoIter { receiver: self } |
1201 | 0 | } |
1202 | | } |
1203 | | |
1204 | | /// A blocking iterator over messages in a channel. |
1205 | | /// |
1206 | | /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the |
1207 | | /// channel becomes empty and disconnected, it returns [`None`] without blocking. |
1208 | | /// |
1209 | | /// [`next`]: Iterator::next |
1210 | | /// |
1211 | | /// # Examples |
1212 | | /// |
1213 | | /// ``` |
1214 | | /// use std::thread; |
1215 | | /// use crossbeam_channel::unbounded; |
1216 | | /// |
1217 | | /// let (s, r) = unbounded(); |
1218 | | /// |
1219 | | /// thread::spawn(move || { |
1220 | | /// s.send(1).unwrap(); |
1221 | | /// s.send(2).unwrap(); |
1222 | | /// s.send(3).unwrap(); |
1223 | | /// drop(s); // Disconnect the channel. |
1224 | | /// }); |
1225 | | /// |
1226 | | /// // Collect all messages from the channel. |
1227 | | /// // Note that the call to `collect` blocks until the sender is dropped. |
1228 | | /// let v: Vec<_> = r.iter().collect(); |
1229 | | /// |
1230 | | /// assert_eq!(v, [1, 2, 3]); |
1231 | | /// ``` |
1232 | | pub struct Iter<'a, T> { |
1233 | | receiver: &'a Receiver<T>, |
1234 | | } |
1235 | | |
1236 | | impl<T> FusedIterator for Iter<'_, T> {} |
1237 | | |
1238 | | impl<T> Iterator for Iter<'_, T> { |
1239 | | type Item = T; |
1240 | | |
1241 | 0 | fn next(&mut self) -> Option<Self::Item> { |
1242 | 0 | self.receiver.recv().ok() |
1243 | 0 | } |
1244 | | } |
1245 | | |
1246 | | impl<T> fmt::Debug for Iter<'_, T> { |
1247 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1248 | 0 | f.pad("Iter { .. }") |
1249 | 0 | } |
1250 | | } |
1251 | | |
1252 | | /// A non-blocking iterator over messages in a channel. |
1253 | | /// |
1254 | | /// Each call to [`next`] returns a message if there is one ready to be received. The iterator |
1255 | | /// never blocks waiting for the next message. |
1256 | | /// |
1257 | | /// [`next`]: Iterator::next |
1258 | | /// |
1259 | | /// # Examples |
1260 | | /// |
1261 | | /// ``` |
1262 | | /// use std::thread; |
1263 | | /// use std::time::Duration; |
1264 | | /// use crossbeam_channel::unbounded; |
1265 | | /// |
1266 | | /// let (s, r) = unbounded::<i32>(); |
1267 | | /// |
1268 | | /// thread::spawn(move || { |
1269 | | /// s.send(1).unwrap(); |
1270 | | /// thread::sleep(Duration::from_secs(1)); |
1271 | | /// s.send(2).unwrap(); |
1272 | | /// thread::sleep(Duration::from_secs(2)); |
1273 | | /// s.send(3).unwrap(); |
1274 | | /// }); |
1275 | | /// |
1276 | | /// thread::sleep(Duration::from_secs(2)); |
1277 | | /// |
1278 | | /// // Collect all messages from the channel without blocking. |
1279 | | /// // The third message hasn't been sent yet so we'll collect only the first two. |
1280 | | /// let v: Vec<_> = r.try_iter().collect(); |
1281 | | /// |
1282 | | /// assert_eq!(v, [1, 2]); |
1283 | | /// ``` |
1284 | | pub struct TryIter<'a, T> { |
1285 | | receiver: &'a Receiver<T>, |
1286 | | } |
1287 | | |
1288 | | impl<T> Iterator for TryIter<'_, T> { |
1289 | | type Item = T; |
1290 | | |
1291 | 0 | fn next(&mut self) -> Option<Self::Item> { |
1292 | 0 | self.receiver.try_recv().ok() |
1293 | 0 | } |
1294 | | } |
1295 | | |
1296 | | impl<T> fmt::Debug for TryIter<'_, T> { |
1297 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1298 | 0 | f.pad("TryIter { .. }") |
1299 | 0 | } |
1300 | | } |
1301 | | |
1302 | | /// A blocking iterator over messages in a channel. |
1303 | | /// |
1304 | | /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the |
1305 | | /// channel becomes empty and disconnected, it returns [`None`] without blocking. |
1306 | | /// |
1307 | | /// [`next`]: Iterator::next |
1308 | | /// |
1309 | | /// # Examples |
1310 | | /// |
1311 | | /// ``` |
1312 | | /// use std::thread; |
1313 | | /// use crossbeam_channel::unbounded; |
1314 | | /// |
1315 | | /// let (s, r) = unbounded(); |
1316 | | /// |
1317 | | /// thread::spawn(move || { |
1318 | | /// s.send(1).unwrap(); |
1319 | | /// s.send(2).unwrap(); |
1320 | | /// s.send(3).unwrap(); |
1321 | | /// drop(s); // Disconnect the channel. |
1322 | | /// }); |
1323 | | /// |
1324 | | /// // Collect all messages from the channel. |
1325 | | /// // Note that the call to `collect` blocks until the sender is dropped. |
1326 | | /// let v: Vec<_> = r.into_iter().collect(); |
1327 | | /// |
1328 | | /// assert_eq!(v, [1, 2, 3]); |
1329 | | /// ``` |
1330 | | pub struct IntoIter<T> { |
1331 | | receiver: Receiver<T>, |
1332 | | } |
1333 | | |
1334 | | impl<T> FusedIterator for IntoIter<T> {} |
1335 | | |
1336 | | impl<T> Iterator for IntoIter<T> { |
1337 | | type Item = T; |
1338 | | |
1339 | 0 | fn next(&mut self) -> Option<Self::Item> { |
1340 | 0 | self.receiver.recv().ok() |
1341 | 0 | } |
1342 | | } |
1343 | | |
1344 | | impl<T> fmt::Debug for IntoIter<T> { |
1345 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1346 | 0 | f.pad("IntoIter { .. }") |
1347 | 0 | } |
1348 | | } |
1349 | | |
1350 | | impl<T> SelectHandle for Sender<T> { |
1351 | 0 | fn try_select(&self, token: &mut Token) -> bool { |
1352 | 0 | match &self.flavor { |
1353 | 0 | SenderFlavor::Array(chan) => chan.sender().try_select(token), |
1354 | 0 | SenderFlavor::List(chan) => chan.sender().try_select(token), |
1355 | 0 | SenderFlavor::Zero(chan) => chan.sender().try_select(token), |
1356 | | } |
1357 | 0 | } |
1358 | | |
1359 | 0 | fn deadline(&self) -> Option<Instant> { |
1360 | 0 | None |
1361 | 0 | } |
1362 | | |
1363 | 0 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
1364 | 0 | match &self.flavor { |
1365 | 0 | SenderFlavor::Array(chan) => chan.sender().register(oper, cx), |
1366 | 0 | SenderFlavor::List(chan) => chan.sender().register(oper, cx), |
1367 | 0 | SenderFlavor::Zero(chan) => chan.sender().register(oper, cx), |
1368 | | } |
1369 | 0 | } |
1370 | | |
1371 | 0 | fn unregister(&self, oper: Operation) { |
1372 | 0 | match &self.flavor { |
1373 | 0 | SenderFlavor::Array(chan) => chan.sender().unregister(oper), |
1374 | 0 | SenderFlavor::List(chan) => chan.sender().unregister(oper), |
1375 | 0 | SenderFlavor::Zero(chan) => chan.sender().unregister(oper), |
1376 | | } |
1377 | 0 | } |
1378 | | |
1379 | 0 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
1380 | 0 | match &self.flavor { |
1381 | 0 | SenderFlavor::Array(chan) => chan.sender().accept(token, cx), |
1382 | 0 | SenderFlavor::List(chan) => chan.sender().accept(token, cx), |
1383 | 0 | SenderFlavor::Zero(chan) => chan.sender().accept(token, cx), |
1384 | | } |
1385 | 0 | } |
1386 | | |
1387 | 0 | fn is_ready(&self) -> bool { |
1388 | 0 | match &self.flavor { |
1389 | 0 | SenderFlavor::Array(chan) => chan.sender().is_ready(), |
1390 | 0 | SenderFlavor::List(chan) => chan.sender().is_ready(), |
1391 | 0 | SenderFlavor::Zero(chan) => chan.sender().is_ready(), |
1392 | | } |
1393 | 0 | } |
1394 | | |
1395 | 0 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
1396 | 0 | match &self.flavor { |
1397 | 0 | SenderFlavor::Array(chan) => chan.sender().watch(oper, cx), |
1398 | 0 | SenderFlavor::List(chan) => chan.sender().watch(oper, cx), |
1399 | 0 | SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx), |
1400 | | } |
1401 | 0 | } |
1402 | | |
1403 | 0 | fn unwatch(&self, oper: Operation) { |
1404 | 0 | match &self.flavor { |
1405 | 0 | SenderFlavor::Array(chan) => chan.sender().unwatch(oper), |
1406 | 0 | SenderFlavor::List(chan) => chan.sender().unwatch(oper), |
1407 | 0 | SenderFlavor::Zero(chan) => chan.sender().unwatch(oper), |
1408 | | } |
1409 | 0 | } |
1410 | | } |
1411 | | |
1412 | | impl<T> SelectHandle for Receiver<T> { |
1413 | 0 | fn try_select(&self, token: &mut Token) -> bool { |
1414 | 0 | match &self.flavor { |
1415 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().try_select(token), |
1416 | 0 | ReceiverFlavor::List(chan) => chan.receiver().try_select(token), |
1417 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token), |
1418 | 0 | ReceiverFlavor::At(chan) => chan.try_select(token), |
1419 | 0 | ReceiverFlavor::Tick(chan) => chan.try_select(token), |
1420 | 0 | ReceiverFlavor::Never(chan) => chan.try_select(token), |
1421 | | } |
1422 | 0 | } |
1423 | | |
1424 | 0 | fn deadline(&self) -> Option<Instant> { |
1425 | 0 | match &self.flavor { |
1426 | 0 | ReceiverFlavor::Array(_) => None, |
1427 | 0 | ReceiverFlavor::List(_) => None, |
1428 | 0 | ReceiverFlavor::Zero(_) => None, |
1429 | 0 | ReceiverFlavor::At(chan) => chan.deadline(), |
1430 | 0 | ReceiverFlavor::Tick(chan) => chan.deadline(), |
1431 | 0 | ReceiverFlavor::Never(chan) => chan.deadline(), |
1432 | | } |
1433 | 0 | } |
1434 | | |
1435 | 0 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
1436 | 0 | match &self.flavor { |
1437 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx), |
1438 | 0 | ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx), |
1439 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx), |
1440 | 0 | ReceiverFlavor::At(chan) => chan.register(oper, cx), |
1441 | 0 | ReceiverFlavor::Tick(chan) => chan.register(oper, cx), |
1442 | 0 | ReceiverFlavor::Never(chan) => chan.register(oper, cx), |
1443 | | } |
1444 | 0 | } |
1445 | | |
1446 | 0 | fn unregister(&self, oper: Operation) { |
1447 | 0 | match &self.flavor { |
1448 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper), |
1449 | 0 | ReceiverFlavor::List(chan) => chan.receiver().unregister(oper), |
1450 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper), |
1451 | 0 | ReceiverFlavor::At(chan) => chan.unregister(oper), |
1452 | 0 | ReceiverFlavor::Tick(chan) => chan.unregister(oper), |
1453 | 0 | ReceiverFlavor::Never(chan) => chan.unregister(oper), |
1454 | | } |
1455 | 0 | } |
1456 | | |
1457 | 0 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
1458 | 0 | match &self.flavor { |
1459 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx), |
1460 | 0 | ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx), |
1461 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx), |
1462 | 0 | ReceiverFlavor::At(chan) => chan.accept(token, cx), |
1463 | 0 | ReceiverFlavor::Tick(chan) => chan.accept(token, cx), |
1464 | 0 | ReceiverFlavor::Never(chan) => chan.accept(token, cx), |
1465 | | } |
1466 | 0 | } |
1467 | | |
1468 | 0 | fn is_ready(&self) -> bool { |
1469 | 0 | match &self.flavor { |
1470 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().is_ready(), |
1471 | 0 | ReceiverFlavor::List(chan) => chan.receiver().is_ready(), |
1472 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(), |
1473 | 0 | ReceiverFlavor::At(chan) => chan.is_ready(), |
1474 | 0 | ReceiverFlavor::Tick(chan) => chan.is_ready(), |
1475 | 0 | ReceiverFlavor::Never(chan) => chan.is_ready(), |
1476 | | } |
1477 | 0 | } |
1478 | | |
1479 | 0 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
1480 | 0 | match &self.flavor { |
1481 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx), |
1482 | 0 | ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx), |
1483 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx), |
1484 | 0 | ReceiverFlavor::At(chan) => chan.watch(oper, cx), |
1485 | 0 | ReceiverFlavor::Tick(chan) => chan.watch(oper, cx), |
1486 | 0 | ReceiverFlavor::Never(chan) => chan.watch(oper, cx), |
1487 | | } |
1488 | 0 | } |
1489 | | |
1490 | 0 | fn unwatch(&self, oper: Operation) { |
1491 | 0 | match &self.flavor { |
1492 | 0 | ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper), |
1493 | 0 | ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper), |
1494 | 0 | ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper), |
1495 | 0 | ReceiverFlavor::At(chan) => chan.unwatch(oper), |
1496 | 0 | ReceiverFlavor::Tick(chan) => chan.unwatch(oper), |
1497 | 0 | ReceiverFlavor::Never(chan) => chan.unwatch(oper), |
1498 | | } |
1499 | 0 | } |
1500 | | } |
1501 | | |
1502 | | /// Writes a message into the channel. |
1503 | 0 | pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> { |
1504 | 0 | match &s.flavor { |
1505 | 0 | SenderFlavor::Array(chan) => chan.write(token, msg), |
1506 | 0 | SenderFlavor::List(chan) => chan.write(token, msg), |
1507 | 0 | SenderFlavor::Zero(chan) => chan.write(token, msg), |
1508 | | } |
1509 | 0 | } |
1510 | | |
1511 | | /// Reads a message from the channel. |
1512 | 0 | pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> { |
1513 | 0 | match &r.flavor { |
1514 | 0 | ReceiverFlavor::Array(chan) => chan.read(token), |
1515 | 0 | ReceiverFlavor::List(chan) => chan.read(token), |
1516 | 0 | ReceiverFlavor::Zero(chan) => chan.read(token), |
1517 | 0 | ReceiverFlavor::At(chan) => { |
1518 | 0 | mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) |
1519 | | } |
1520 | 0 | ReceiverFlavor::Tick(chan) => { |
1521 | 0 | mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) |
1522 | | } |
1523 | 0 | ReceiverFlavor::Never(chan) => chan.read(token), |
1524 | | } |
1525 | 0 | } |