/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-channel-0.3.28/src/mpsc/mod.rs
Line | Count | Source |
1 | | //! A multi-producer, single-consumer queue for sending values across |
2 | | //! asynchronous tasks. |
3 | | //! |
4 | | //! Similarly to the `std`, channel creation provides [`Receiver`] and |
5 | | //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to |
6 | | //! read values out of the channel. If there is no message to read from the |
7 | | //! channel, the current task will be notified when a new value is sent. |
8 | | //! [`Sender`] implements the `Sink` trait and allows a task to send messages into |
9 | | //! the channel. If the channel is at capacity, the send will be rejected and |
10 | | //! the task will be notified when additional capacity is available. In other |
11 | | //! words, the channel provides backpressure. |
12 | | //! |
13 | | //! Unbounded channels are also available using the `unbounded` constructor. |
14 | | //! |
15 | | //! # Disconnection |
16 | | //! |
17 | | //! When all [`Sender`] handles have been dropped, it is no longer |
18 | | //! possible to send values into the channel. This is considered the termination |
19 | | //! event of the stream. As such, [`Receiver::poll_next`] |
20 | | //! will return `Ok(Ready(None))`. |
21 | | //! |
22 | | //! If the [`Receiver`] handle is dropped, then messages can no longer |
23 | | //! be read out of the channel. In this case, all further attempts to send will |
24 | | //! result in an error. |
25 | | //! |
26 | | //! # Clean Shutdown |
27 | | //! |
28 | | //! If the [`Receiver`] is simply dropped, then it is possible for |
29 | | //! there to be messages still in the channel that will not be processed. As |
30 | | //! such, it is usually desirable to perform a "clean" shutdown. To do this, the |
31 | | //! receiver will first call `close`, which will prevent any further messages to |
32 | | //! be sent into the channel. Then, the receiver consumes the channel to |
33 | | //! completion, at which point the receiver can be dropped. |
34 | | //! |
35 | | //! [`Sender`]: struct.Sender.html |
36 | | //! [`Receiver`]: struct.Receiver.html |
37 | | //! [`Stream`]: ../../futures_core/stream/trait.Stream.html |
38 | | //! [`Receiver::poll_next`]: |
39 | | //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next |
40 | | |
41 | | // At the core, the channel uses an atomic FIFO queue for message passing. This |
42 | | // queue is used as the primary coordination primitive. In order to enforce |
43 | | // capacity limits and handle back pressure, a secondary FIFO queue is used to |
44 | | // send parked task handles. |
45 | | // |
46 | | // The general idea is that the channel is created with a `buffer` size of `n`. |
47 | | // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" |
48 | | // slot to hold a message. This allows `Sender` to know for a fact that a send |
49 | | // will succeed *before* starting to do the actual work of sending the value. |
50 | | // Since most of this work is lock-free, once the work starts, it is impossible |
51 | | // to safely revert. |
52 | | // |
53 | | // If the sender is unable to process a send operation, then the current |
54 | | // task is parked and the handle is sent on the parked task queue. |
55 | | // |
56 | | // Note that the implementation guarantees that the channel capacity will never |
57 | | // exceed the configured limit, however there is no *strict* guarantee that the |
58 | | // receiver will wake up a parked task *immediately* when a slot becomes |
59 | | // available. However, it will almost always unpark a task when a slot becomes |
60 | | // available and it is *guaranteed* that a sender will be unparked when the |
61 | | // message that caused the sender to become parked is read out of the channel. |
62 | | // |
63 | | // The steps for sending a message are roughly: |
64 | | // |
65 | | // 1) Increment the channel message count |
66 | | // 2) If the channel is at capacity, push the task handle onto the wait queue |
67 | | // 3) Push the message onto the message queue. |
68 | | // |
69 | | // The steps for receiving a message are roughly: |
70 | | // |
71 | | // 1) Pop a message from the message queue |
72 | | // 2) Pop a task handle from the wait queue |
73 | | // 3) Decrement the channel message count. |
74 | | // |
75 | | // It's important for the order of operations on lock-free structures to happen |
76 | | // in reverse order between the sender and receiver. This makes the message |
77 | | // queue the primary coordination structure and establishes the necessary |
78 | | // happens-before semantics required for the acquire / release semantics used |
79 | | // by the queue structure. |
80 | | |
81 | | use futures_core::stream::{FusedStream, Stream}; |
82 | | use futures_core::task::__internal::AtomicWaker; |
83 | | use futures_core::task::{Context, Poll, Waker}; |
84 | | use std::fmt; |
85 | | use std::pin::Pin; |
86 | | use std::sync::atomic::AtomicUsize; |
87 | | use std::sync::atomic::Ordering::SeqCst; |
88 | | use std::sync::{Arc, Mutex}; |
89 | | use std::thread; |
90 | | |
91 | | use crate::mpsc::queue::Queue; |
92 | | |
93 | | mod queue; |
94 | | #[cfg(feature = "sink")] |
95 | | mod sink_impl; |
96 | | |
97 | | struct UnboundedSenderInner<T> { |
98 | | // Channel state shared between the sender and receiver. |
99 | | inner: Arc<UnboundedInner<T>>, |
100 | | } |
101 | | |
102 | | struct BoundedSenderInner<T> { |
103 | | // Channel state shared between the sender and receiver. |
104 | | inner: Arc<BoundedInner<T>>, |
105 | | |
106 | | // Handle to the task that is blocked on this sender. This handle is sent |
107 | | // to the receiver half in order to be notified when the sender becomes |
108 | | // unblocked. |
109 | | sender_task: Arc<Mutex<SenderTask>>, |
110 | | |
111 | | // `true` if the sender might be blocked. This is an optimization to avoid |
112 | | // having to lock the mutex most of the time. |
113 | | maybe_parked: bool, |
114 | | } |
115 | | |
116 | | // We never project Pin<&mut SenderInner> to `Pin<&mut T>` |
117 | | impl<T> Unpin for UnboundedSenderInner<T> {} |
118 | | impl<T> Unpin for BoundedSenderInner<T> {} |
119 | | |
120 | | /// The transmission end of a bounded mpsc channel. |
121 | | /// |
122 | | /// This value is created by the [`channel`](channel) function. |
123 | | pub struct Sender<T>(Option<BoundedSenderInner<T>>); |
124 | | |
125 | | /// The transmission end of an unbounded mpsc channel. |
126 | | /// |
127 | | /// This value is created by the [`unbounded`](unbounded) function. |
128 | | pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); |
129 | | |
130 | | trait AssertKinds: Send + Sync + Clone {} |
131 | | impl AssertKinds for UnboundedSender<u32> {} |
132 | | |
133 | | /// The receiving end of a bounded mpsc channel. |
134 | | /// |
135 | | /// This value is created by the [`channel`](channel) function. |
136 | | pub struct Receiver<T> { |
137 | | inner: Option<Arc<BoundedInner<T>>>, |
138 | | } |
139 | | |
140 | | /// The receiving end of an unbounded mpsc channel. |
141 | | /// |
142 | | /// This value is created by the [`unbounded`](unbounded) function. |
143 | | pub struct UnboundedReceiver<T> { |
144 | | inner: Option<Arc<UnboundedInner<T>>>, |
145 | | } |
146 | | |
147 | | // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` |
148 | | impl<T> Unpin for UnboundedReceiver<T> {} |
149 | | |
150 | | /// The error type for [`Sender`s](Sender) used as `Sink`s. |
151 | | #[derive(Clone, Debug, PartialEq, Eq)] |
152 | | pub struct SendError { |
153 | | kind: SendErrorKind, |
154 | | } |
155 | | |
156 | | /// The error type returned from [`try_send`](Sender::try_send). |
157 | | #[derive(Clone, PartialEq, Eq)] |
158 | | pub struct TrySendError<T> { |
159 | | err: SendError, |
160 | | val: T, |
161 | | } |
162 | | |
163 | | #[derive(Clone, Debug, PartialEq, Eq)] |
164 | | enum SendErrorKind { |
165 | | Full, |
166 | | Disconnected, |
167 | | } |
168 | | |
169 | | /// The error type returned from [`try_next`](Receiver::try_next). |
170 | | pub struct TryRecvError { |
171 | | _priv: (), |
172 | | } |
173 | | |
174 | | impl fmt::Display for SendError { |
175 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
176 | 0 | if self.is_full() { |
177 | 0 | write!(f, "send failed because channel is full") |
178 | | } else { |
179 | 0 | write!(f, "send failed because receiver is gone") |
180 | | } |
181 | 0 | } |
182 | | } |
183 | | |
184 | | impl std::error::Error for SendError {} |
185 | | |
186 | | impl SendError { |
187 | | /// Returns `true` if this error is a result of the channel being full. |
188 | 0 | pub fn is_full(&self) -> bool { |
189 | 0 | match self.kind { |
190 | 0 | SendErrorKind::Full => true, |
191 | 0 | _ => false, |
192 | | } |
193 | 0 | } |
194 | | |
195 | | /// Returns `true` if this error is a result of the receiver being dropped. |
196 | 0 | pub fn is_disconnected(&self) -> bool { |
197 | 0 | match self.kind { |
198 | 0 | SendErrorKind::Disconnected => true, |
199 | 0 | _ => false, |
200 | | } |
201 | 0 | } |
202 | | } |
203 | | |
204 | | impl<T> fmt::Debug for TrySendError<T> { |
205 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
206 | 0 | f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() |
207 | 0 | } |
208 | | } |
209 | | |
210 | | impl<T> fmt::Display for TrySendError<T> { |
211 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
212 | 0 | if self.is_full() { |
213 | 0 | write!(f, "send failed because channel is full") |
214 | | } else { |
215 | 0 | write!(f, "send failed because receiver is gone") |
216 | | } |
217 | 0 | } |
218 | | } |
219 | | |
220 | | impl<T: core::any::Any> std::error::Error for TrySendError<T> {} |
221 | | |
222 | | impl<T> TrySendError<T> { |
223 | | /// Returns `true` if this error is a result of the channel being full. |
224 | 0 | pub fn is_full(&self) -> bool { |
225 | 0 | self.err.is_full() |
226 | 0 | } |
227 | | |
228 | | /// Returns `true` if this error is a result of the receiver being dropped. |
229 | 0 | pub fn is_disconnected(&self) -> bool { |
230 | 0 | self.err.is_disconnected() |
231 | 0 | } |
232 | | |
233 | | /// Returns the message that was attempted to be sent but failed. |
234 | 0 | pub fn into_inner(self) -> T { |
235 | 0 | self.val |
236 | 0 | } |
237 | | |
238 | | /// Drops the message and converts into a `SendError`. |
239 | 0 | pub fn into_send_error(self) -> SendError { |
240 | 0 | self.err |
241 | 0 | } |
242 | | } |
243 | | |
244 | | impl fmt::Debug for TryRecvError { |
245 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
246 | 0 | f.debug_tuple("TryRecvError").finish() |
247 | 0 | } |
248 | | } |
249 | | |
250 | | impl fmt::Display for TryRecvError { |
251 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
252 | 0 | write!(f, "receiver channel is empty") |
253 | 0 | } |
254 | | } |
255 | | |
256 | | impl std::error::Error for TryRecvError {} |
257 | | |
258 | | struct UnboundedInner<T> { |
259 | | // Internal channel state. Consists of the number of messages stored in the |
260 | | // channel as well as a flag signalling that the channel is closed. |
261 | | state: AtomicUsize, |
262 | | |
263 | | // Atomic, FIFO queue used to send messages to the receiver |
264 | | message_queue: Queue<T>, |
265 | | |
266 | | // Number of senders in existence |
267 | | num_senders: AtomicUsize, |
268 | | |
269 | | // Handle to the receiver's task. |
270 | | recv_task: AtomicWaker, |
271 | | } |
272 | | |
273 | | struct BoundedInner<T> { |
274 | | // Max buffer size of the channel. If `None` then the channel is unbounded. |
275 | | buffer: usize, |
276 | | |
277 | | // Internal channel state. Consists of the number of messages stored in the |
278 | | // channel as well as a flag signalling that the channel is closed. |
279 | | state: AtomicUsize, |
280 | | |
281 | | // Atomic, FIFO queue used to send messages to the receiver |
282 | | message_queue: Queue<T>, |
283 | | |
284 | | // Atomic, FIFO queue used to send parked task handles to the receiver. |
285 | | parked_queue: Queue<Arc<Mutex<SenderTask>>>, |
286 | | |
287 | | // Number of senders in existence |
288 | | num_senders: AtomicUsize, |
289 | | |
290 | | // Handle to the receiver's task. |
291 | | recv_task: AtomicWaker, |
292 | | } |
293 | | |
294 | | // Struct representation of `Inner::state`. |
295 | | #[derive(Clone, Copy)] |
296 | | struct State { |
297 | | // `true` when the channel is open |
298 | | is_open: bool, |
299 | | |
300 | | // Number of messages in the channel |
301 | | num_messages: usize, |
302 | | } |
303 | | |
304 | | // The `is_open` flag is stored in the left-most bit of `Inner::state` |
305 | | const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1); |
306 | | |
307 | | // When a new channel is created, it is created in the open state with no |
308 | | // pending messages. |
309 | | const INIT_STATE: usize = OPEN_MASK; |
310 | | |
311 | | // The maximum number of messages that a channel can track is `usize::max_value() >> 1` |
312 | | const MAX_CAPACITY: usize = !(OPEN_MASK); |
313 | | |
314 | | // The maximum requested buffer size must be less than the maximum capacity of |
315 | | // a channel. This is because each sender gets a guaranteed slot. |
316 | | const MAX_BUFFER: usize = MAX_CAPACITY >> 1; |
317 | | |
318 | | // Sent to the consumer to wake up blocked producers |
319 | | struct SenderTask { |
320 | | task: Option<Waker>, |
321 | | is_parked: bool, |
322 | | } |
323 | | |
324 | | impl SenderTask { |
325 | 0 | fn new() -> Self { |
326 | 0 | Self { task: None, is_parked: false } |
327 | 0 | } |
328 | | |
329 | 0 | fn notify(&mut self) { |
330 | 0 | self.is_parked = false; |
331 | | |
332 | 0 | if let Some(task) = self.task.take() { |
333 | 0 | task.wake(); |
334 | 0 | } |
335 | 0 | } |
336 | | } |
337 | | |
338 | | /// Creates a bounded mpsc channel for communicating between asynchronous tasks. |
339 | | /// |
340 | | /// Being bounded, this channel provides backpressure to ensure that the sender |
341 | | /// outpaces the receiver by only a limited amount. The channel's capacity is |
342 | | /// equal to `buffer + num-senders`. In other words, each sender gets a |
343 | | /// guaranteed slot in the channel capacity, and on top of that there are |
344 | | /// `buffer` "first come, first serve" slots available to all senders. |
345 | | /// |
346 | | /// The [`Receiver`](Receiver) returned implements the |
347 | | /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements |
348 | | /// `Sink`. |
349 | 0 | pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { |
350 | | // Check that the requested buffer size does not exceed the maximum buffer |
351 | | // size permitted by the system. |
352 | 0 | assert!(buffer < MAX_BUFFER, "requested buffer size too large"); |
353 | | |
354 | 0 | let inner = Arc::new(BoundedInner { |
355 | 0 | buffer, |
356 | 0 | state: AtomicUsize::new(INIT_STATE), |
357 | 0 | message_queue: Queue::new(), |
358 | 0 | parked_queue: Queue::new(), |
359 | 0 | num_senders: AtomicUsize::new(1), |
360 | 0 | recv_task: AtomicWaker::new(), |
361 | 0 | }); |
362 | | |
363 | 0 | let tx = BoundedSenderInner { |
364 | 0 | inner: inner.clone(), |
365 | 0 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
366 | 0 | maybe_parked: false, |
367 | 0 | }; |
368 | | |
369 | 0 | let rx = Receiver { inner: Some(inner) }; |
370 | | |
371 | 0 | (Sender(Some(tx)), rx) |
372 | 0 | } |
373 | | |
374 | | /// Creates an unbounded mpsc channel for communicating between asynchronous |
375 | | /// tasks. |
376 | | /// |
377 | | /// A `send` on this channel will always succeed as long as the receive half has |
378 | | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
379 | | /// buffered. |
380 | | /// |
381 | | /// **Note** that the amount of available system memory is an implicit bound to |
382 | | /// the channel. Using an `unbounded` channel has the ability of causing the |
383 | | /// process to run out of memory. In this case, the process will be aborted. |
384 | 0 | pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
385 | 0 | let inner = Arc::new(UnboundedInner { |
386 | 0 | state: AtomicUsize::new(INIT_STATE), |
387 | 0 | message_queue: Queue::new(), |
388 | 0 | num_senders: AtomicUsize::new(1), |
389 | 0 | recv_task: AtomicWaker::new(), |
390 | 0 | }); |
391 | | |
392 | 0 | let tx = UnboundedSenderInner { inner: inner.clone() }; |
393 | | |
394 | 0 | let rx = UnboundedReceiver { inner: Some(inner) }; |
395 | | |
396 | 0 | (UnboundedSender(Some(tx)), rx) |
397 | 0 | } Unexecuted instantiation: futures_channel::mpsc::unbounded::<dbus::message::Message> Unexecuted instantiation: futures_channel::mpsc::unbounded::<_> |
398 | | |
399 | | /* |
400 | | * |
401 | | * ===== impl Sender ===== |
402 | | * |
403 | | */ |
404 | | |
405 | | impl<T> UnboundedSenderInner<T> { |
406 | 0 | fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { |
407 | 0 | let state = decode_state(self.inner.state.load(SeqCst)); |
408 | 0 | if state.is_open { |
409 | 0 | Poll::Ready(Ok(())) |
410 | | } else { |
411 | 0 | Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) |
412 | | } |
413 | 0 | } |
414 | | |
415 | | // Push message to the queue and signal to the receiver |
416 | 0 | fn queue_push_and_signal(&self, msg: T) { |
417 | | // Push the message onto the message queue |
418 | 0 | self.inner.message_queue.push(msg); |
419 | | |
420 | | // Signal to the receiver that a message has been enqueued. If the |
421 | | // receiver is parked, this will unpark the task. |
422 | 0 | self.inner.recv_task.wake(); |
423 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::queue_push_and_signal Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::queue_push_and_signal |
424 | | |
425 | | // Increment the number of queued messages. Returns the resulting number. |
426 | 0 | fn inc_num_messages(&self) -> Option<usize> { |
427 | 0 | let mut curr = self.inner.state.load(SeqCst); |
428 | | |
429 | | loop { |
430 | 0 | let mut state = decode_state(curr); |
431 | | |
432 | | // The receiver end closed the channel. |
433 | 0 | if !state.is_open { |
434 | 0 | return None; |
435 | 0 | } |
436 | | |
437 | | // This probably is never hit? Odds are the process will run out of |
438 | | // memory first. It may be worth to return something else in this |
439 | | // case? |
440 | 0 | assert!( |
441 | 0 | state.num_messages < MAX_CAPACITY, |
442 | | "buffer space \ |
443 | | exhausted; sending this messages would overflow the state" |
444 | | ); |
445 | | |
446 | 0 | state.num_messages += 1; |
447 | | |
448 | 0 | let next = encode_state(&state); |
449 | 0 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
450 | 0 | Ok(_) => return Some(state.num_messages), |
451 | 0 | Err(actual) => curr = actual, |
452 | | } |
453 | | } |
454 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::inc_num_messages Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::inc_num_messages |
455 | | |
456 | | /// Returns whether the senders send to the same receiver. |
457 | 0 | fn same_receiver(&self, other: &Self) -> bool { |
458 | 0 | Arc::ptr_eq(&self.inner, &other.inner) |
459 | 0 | } |
460 | | |
461 | | /// Returns whether the sender send to this receiver. |
462 | 0 | fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { |
463 | 0 | Arc::ptr_eq(&self.inner, inner) |
464 | 0 | } |
465 | | |
466 | | /// Returns pointer to the Arc containing sender |
467 | | /// |
468 | | /// The returned pointer is not referenced and should be only used for hashing! |
469 | 0 | fn ptr(&self) -> *const UnboundedInner<T> { |
470 | 0 | &*self.inner |
471 | 0 | } |
472 | | |
473 | | /// Returns whether this channel is closed without needing a context. |
474 | 0 | fn is_closed(&self) -> bool { |
475 | 0 | !decode_state(self.inner.state.load(SeqCst)).is_open |
476 | 0 | } |
477 | | |
478 | | /// Closes this channel from the sender side, preventing any new messages. |
479 | 0 | fn close_channel(&self) { |
480 | | // There's no need to park this sender, its dropping, |
481 | | // and we don't want to check for capacity, so skip |
482 | | // that stuff from `do_send`. |
483 | | |
484 | 0 | self.inner.set_closed(); |
485 | 0 | self.inner.recv_task.wake(); |
486 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message>>::close_channel Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_>>::close_channel |
487 | | } |
488 | | |
489 | | impl<T> BoundedSenderInner<T> { |
490 | | /// Attempts to send a message on this `Sender`, returning the message |
491 | | /// if there was an error. |
492 | 0 | fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
493 | | // If the sender is currently blocked, reject the message |
494 | 0 | if !self.poll_unparked(None).is_ready() { |
495 | 0 | return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); |
496 | 0 | } |
497 | | |
498 | | // The channel has capacity to accept the message, so send it |
499 | 0 | self.do_send_b(msg) |
500 | 0 | } |
501 | | |
502 | | // Do the send without failing. |
503 | | // Can be called only by bounded sender. |
504 | 0 | fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
505 | | // Anyone calling do_send *should* make sure there is room first, |
506 | | // but assert here for tests as a sanity check. |
507 | 0 | debug_assert!(self.poll_unparked(None).is_ready()); |
508 | | |
509 | | // First, increment the number of messages contained by the channel. |
510 | | // This operation will also atomically determine if the sender task |
511 | | // should be parked. |
512 | | // |
513 | | // `None` is returned in the case that the channel has been closed by the |
514 | | // receiver. This happens when `Receiver::close` is called or the |
515 | | // receiver is dropped. |
516 | 0 | let park_self = match self.inc_num_messages() { |
517 | 0 | Some(num_messages) => { |
518 | | // Block if the current number of pending messages has exceeded |
519 | | // the configured buffer size |
520 | 0 | num_messages > self.inner.buffer |
521 | | } |
522 | | None => { |
523 | 0 | return Err(TrySendError { |
524 | 0 | err: SendError { kind: SendErrorKind::Disconnected }, |
525 | 0 | val: msg, |
526 | 0 | }) |
527 | | } |
528 | | }; |
529 | | |
530 | | // If the channel has reached capacity, then the sender task needs to |
531 | | // be parked. This will send the task handle on the parked task queue. |
532 | | // |
533 | | // However, when `do_send` is called while dropping the `Sender`, |
534 | | // `task::current()` can't be called safely. In this case, in order to |
535 | | // maintain internal consistency, a blank message is pushed onto the |
536 | | // parked task queue. |
537 | 0 | if park_self { |
538 | 0 | self.park(); |
539 | 0 | } |
540 | | |
541 | 0 | self.queue_push_and_signal(msg); |
542 | | |
543 | 0 | Ok(()) |
544 | 0 | } |
545 | | |
546 | | // Push message to the queue and signal to the receiver |
547 | 0 | fn queue_push_and_signal(&self, msg: T) { |
548 | | // Push the message onto the message queue |
549 | 0 | self.inner.message_queue.push(msg); |
550 | | |
551 | | // Signal to the receiver that a message has been enqueued. If the |
552 | | // receiver is parked, this will unpark the task. |
553 | 0 | self.inner.recv_task.wake(); |
554 | 0 | } |
555 | | |
556 | | // Increment the number of queued messages. Returns the resulting number. |
557 | 0 | fn inc_num_messages(&self) -> Option<usize> { |
558 | 0 | let mut curr = self.inner.state.load(SeqCst); |
559 | | |
560 | | loop { |
561 | 0 | let mut state = decode_state(curr); |
562 | | |
563 | | // The receiver end closed the channel. |
564 | 0 | if !state.is_open { |
565 | 0 | return None; |
566 | 0 | } |
567 | | |
568 | | // This probably is never hit? Odds are the process will run out of |
569 | | // memory first. It may be worth to return something else in this |
570 | | // case? |
571 | 0 | assert!( |
572 | 0 | state.num_messages < MAX_CAPACITY, |
573 | | "buffer space \ |
574 | | exhausted; sending this messages would overflow the state" |
575 | | ); |
576 | | |
577 | 0 | state.num_messages += 1; |
578 | | |
579 | 0 | let next = encode_state(&state); |
580 | 0 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
581 | 0 | Ok(_) => return Some(state.num_messages), |
582 | 0 | Err(actual) => curr = actual, |
583 | | } |
584 | | } |
585 | 0 | } |
586 | | |
587 | 0 | fn park(&mut self) { |
588 | 0 | { |
589 | 0 | let mut sender = self.sender_task.lock().unwrap(); |
590 | 0 | sender.task = None; |
591 | 0 | sender.is_parked = true; |
592 | 0 | } |
593 | | |
594 | | // Send handle over queue |
595 | 0 | let t = self.sender_task.clone(); |
596 | 0 | self.inner.parked_queue.push(t); |
597 | | |
598 | | // Check to make sure we weren't closed after we sent our task on the |
599 | | // queue |
600 | 0 | let state = decode_state(self.inner.state.load(SeqCst)); |
601 | 0 | self.maybe_parked = state.is_open; |
602 | 0 | } |
603 | | |
604 | | /// Polls the channel to determine if there is guaranteed capacity to send |
605 | | /// at least one item without waiting. |
606 | | /// |
607 | | /// # Return value |
608 | | /// |
609 | | /// This method returns: |
610 | | /// |
611 | | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
612 | | /// - `Poll::Pending` if the channel may not have |
613 | | /// capacity, in which case the current task is queued to be notified once |
614 | | /// capacity is available; |
615 | | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
616 | 0 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
617 | 0 | let state = decode_state(self.inner.state.load(SeqCst)); |
618 | 0 | if !state.is_open { |
619 | 0 | return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); |
620 | 0 | } |
621 | | |
622 | 0 | self.poll_unparked(Some(cx)).map(Ok) |
623 | 0 | } |
624 | | |
625 | | /// Returns whether the senders send to the same receiver. |
626 | 0 | fn same_receiver(&self, other: &Self) -> bool { |
627 | 0 | Arc::ptr_eq(&self.inner, &other.inner) |
628 | 0 | } |
629 | | |
630 | | /// Returns whether the sender send to this receiver. |
631 | 0 | fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { |
632 | 0 | Arc::ptr_eq(&self.inner, receiver) |
633 | 0 | } |
634 | | |
635 | | /// Returns pointer to the Arc containing sender |
636 | | /// |
637 | | /// The returned pointer is not referenced and should be only used for hashing! |
638 | 0 | fn ptr(&self) -> *const BoundedInner<T> { |
639 | 0 | &*self.inner |
640 | 0 | } |
641 | | |
642 | | /// Returns whether this channel is closed without needing a context. |
643 | 0 | fn is_closed(&self) -> bool { |
644 | 0 | !decode_state(self.inner.state.load(SeqCst)).is_open |
645 | 0 | } |
646 | | |
647 | | /// Closes this channel from the sender side, preventing any new messages. |
648 | 0 | fn close_channel(&self) { |
649 | | // There's no need to park this sender, its dropping, |
650 | | // and we don't want to check for capacity, so skip |
651 | | // that stuff from `do_send`. |
652 | | |
653 | 0 | self.inner.set_closed(); |
654 | 0 | self.inner.recv_task.wake(); |
655 | 0 | } |
656 | | |
657 | 0 | fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { |
658 | | // First check the `maybe_parked` variable. This avoids acquiring the |
659 | | // lock in most cases |
660 | 0 | if self.maybe_parked { |
661 | | // Get a lock on the task handle |
662 | 0 | let mut task = self.sender_task.lock().unwrap(); |
663 | | |
664 | 0 | if !task.is_parked { |
665 | 0 | self.maybe_parked = false; |
666 | 0 | return Poll::Ready(()); |
667 | 0 | } |
668 | | |
669 | | // At this point, an unpark request is pending, so there will be an |
670 | | // unpark sometime in the future. We just need to make sure that |
671 | | // the correct task will be notified. |
672 | | // |
673 | | // Update the task in case the `Sender` has been moved to another |
674 | | // task |
675 | 0 | task.task = cx.map(|cx| cx.waker().clone()); |
676 | | |
677 | 0 | Poll::Pending |
678 | | } else { |
679 | 0 | Poll::Ready(()) |
680 | | } |
681 | 0 | } |
682 | | } |
683 | | |
684 | | impl<T> Sender<T> { |
685 | | /// Attempts to send a message on this `Sender`, returning the message |
686 | | /// if there was an error. |
687 | 0 | pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
688 | 0 | if let Some(inner) = &mut self.0 { |
689 | 0 | inner.try_send(msg) |
690 | | } else { |
691 | 0 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
692 | | } |
693 | 0 | } |
694 | | |
695 | | /// Send a message on the channel. |
696 | | /// |
697 | | /// This function should only be called after |
698 | | /// [`poll_ready`](Sender::poll_ready) has reported that the channel is |
699 | | /// ready to receive a message. |
700 | 0 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
701 | 0 | self.try_send(msg).map_err(|e| e.err) |
702 | 0 | } |
703 | | |
704 | | /// Polls the channel to determine if there is guaranteed capacity to send |
705 | | /// at least one item without waiting. |
706 | | /// |
707 | | /// # Return value |
708 | | /// |
709 | | /// This method returns: |
710 | | /// |
711 | | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
712 | | /// - `Poll::Pending` if the channel may not have |
713 | | /// capacity, in which case the current task is queued to be notified once |
714 | | /// capacity is available; |
715 | | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
716 | 0 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
717 | 0 | let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
718 | 0 | inner.poll_ready(cx) |
719 | 0 | } |
720 | | |
721 | | /// Returns whether this channel is closed without needing a context. |
722 | 0 | pub fn is_closed(&self) -> bool { |
723 | 0 | self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) |
724 | 0 | } |
725 | | |
726 | | /// Closes this channel from the sender side, preventing any new messages. |
727 | 0 | pub fn close_channel(&mut self) { |
728 | 0 | if let Some(inner) = &mut self.0 { |
729 | 0 | inner.close_channel(); |
730 | 0 | } |
731 | 0 | } |
732 | | |
733 | | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
734 | 0 | pub fn disconnect(&mut self) { |
735 | 0 | self.0 = None; |
736 | 0 | } |
737 | | |
738 | | /// Returns whether the senders send to the same receiver. |
739 | 0 | pub fn same_receiver(&self, other: &Self) -> bool { |
740 | 0 | match (&self.0, &other.0) { |
741 | 0 | (Some(inner), Some(other)) => inner.same_receiver(other), |
742 | 0 | _ => false, |
743 | | } |
744 | 0 | } |
745 | | |
746 | | /// Returns whether the sender send to this receiver. |
747 | 0 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
748 | 0 | match (&self.0, &receiver.inner) { |
749 | 0 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
750 | 0 | _ => false, |
751 | | } |
752 | 0 | } |
753 | | |
754 | | /// Hashes the receiver into the provided hasher |
755 | 0 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
756 | 0 | where |
757 | 0 | H: std::hash::Hasher, |
758 | | { |
759 | | use std::hash::Hash; |
760 | | |
761 | 0 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
762 | 0 | ptr.hash(hasher); |
763 | 0 | } |
764 | | } |
765 | | |
766 | | impl<T> UnboundedSender<T> { |
767 | | /// Check if the channel is ready to receive a message. |
768 | 0 | pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
769 | 0 | let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
770 | 0 | inner.poll_ready_nb() |
771 | 0 | } |
772 | | |
773 | | /// Returns whether this channel is closed without needing a context. |
774 | 0 | pub fn is_closed(&self) -> bool { |
775 | 0 | self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) |
776 | 0 | } |
777 | | |
778 | | /// Closes this channel from the sender side, preventing any new messages. |
779 | 0 | pub fn close_channel(&self) { |
780 | 0 | if let Some(inner) = &self.0 { |
781 | 0 | inner.close_channel(); |
782 | 0 | } |
783 | 0 | } |
784 | | |
785 | | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
786 | 0 | pub fn disconnect(&mut self) { |
787 | 0 | self.0 = None; |
788 | 0 | } |
789 | | |
790 | | // Do the send without parking current task. |
791 | 0 | fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { |
792 | 0 | if let Some(inner) = &self.0 { |
793 | 0 | if inner.inc_num_messages().is_some() { |
794 | 0 | inner.queue_push_and_signal(msg); |
795 | 0 | return Ok(()); |
796 | 0 | } |
797 | 0 | } |
798 | | |
799 | 0 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
800 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<dbus::message::Message>>::do_send_nb Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::do_send_nb |
801 | | |
802 | | /// Send a message on the channel. |
803 | | /// |
804 | | /// This method should only be called after `poll_ready` has been used to |
805 | | /// verify that the channel is ready to receive a message. |
806 | 0 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
807 | 0 | self.do_send_nb(msg).map_err(|e| e.err) |
808 | 0 | } |
809 | | |
810 | | /// Sends a message along this channel. |
811 | | /// |
812 | | /// This is an unbounded sender, so this function differs from `Sink::send` |
813 | | /// by ensuring the return type reflects that the channel is always ready to |
814 | | /// receive messages. |
815 | 0 | pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
816 | 0 | self.do_send_nb(msg) |
817 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<dbus::message::Message>>::unbounded_send Unexecuted instantiation: <futures_channel::mpsc::UnboundedSender<_>>::unbounded_send |
818 | | |
819 | | /// Returns whether the senders send to the same receiver. |
820 | 0 | pub fn same_receiver(&self, other: &Self) -> bool { |
821 | 0 | match (&self.0, &other.0) { |
822 | 0 | (Some(inner), Some(other)) => inner.same_receiver(other), |
823 | 0 | _ => false, |
824 | | } |
825 | 0 | } |
826 | | |
827 | | /// Returns whether the sender send to this receiver. |
828 | 0 | pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { |
829 | 0 | match (&self.0, &receiver.inner) { |
830 | 0 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
831 | 0 | _ => false, |
832 | | } |
833 | 0 | } |
834 | | |
835 | | /// Hashes the receiver into the provided hasher |
836 | 0 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
837 | 0 | where |
838 | 0 | H: std::hash::Hasher, |
839 | | { |
840 | | use std::hash::Hash; |
841 | | |
842 | 0 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
843 | 0 | ptr.hash(hasher); |
844 | 0 | } |
845 | | } |
846 | | |
847 | | impl<T> Clone for Sender<T> { |
848 | 0 | fn clone(&self) -> Self { |
849 | 0 | Self(self.0.clone()) |
850 | 0 | } |
851 | | } |
852 | | |
853 | | impl<T> Clone for UnboundedSender<T> { |
854 | 0 | fn clone(&self) -> Self { |
855 | 0 | Self(self.0.clone()) |
856 | 0 | } |
857 | | } |
858 | | |
859 | | impl<T> Clone for UnboundedSenderInner<T> { |
860 | 0 | fn clone(&self) -> Self { |
861 | | // Since this atomic op isn't actually guarding any memory and we don't |
862 | | // care about any orderings besides the ordering on the single atomic |
863 | | // variable, a relaxed ordering is acceptable. |
864 | 0 | let mut curr = self.inner.num_senders.load(SeqCst); |
865 | | |
866 | | loop { |
867 | | // If the maximum number of senders has been reached, then fail |
868 | 0 | if curr == MAX_BUFFER { |
869 | 0 | panic!("cannot clone `Sender` -- too many outstanding senders"); |
870 | 0 | } |
871 | | |
872 | 0 | debug_assert!(curr < MAX_BUFFER); |
873 | | |
874 | 0 | let next = curr + 1; |
875 | 0 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
876 | | Ok(_) => { |
877 | | // The ABA problem doesn't matter here. We only care that the |
878 | | // number of senders never exceeds the maximum. |
879 | 0 | return Self { inner: self.inner.clone() }; |
880 | | } |
881 | 0 | Err(actual) => curr = actual, |
882 | | } |
883 | | } |
884 | 0 | } |
885 | | } |
886 | | |
887 | | impl<T> Clone for BoundedSenderInner<T> { |
888 | 0 | fn clone(&self) -> Self { |
889 | | // Since this atomic op isn't actually guarding any memory and we don't |
890 | | // care about any orderings besides the ordering on the single atomic |
891 | | // variable, a relaxed ordering is acceptable. |
892 | 0 | let mut curr = self.inner.num_senders.load(SeqCst); |
893 | | |
894 | | loop { |
895 | | // If the maximum number of senders has been reached, then fail |
896 | 0 | if curr == self.inner.max_senders() { |
897 | 0 | panic!("cannot clone `Sender` -- too many outstanding senders"); |
898 | 0 | } |
899 | | |
900 | 0 | debug_assert!(curr < self.inner.max_senders()); |
901 | | |
902 | 0 | let next = curr + 1; |
903 | 0 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
904 | | Ok(_) => { |
905 | | // The ABA problem doesn't matter here. We only care that the |
906 | | // number of senders never exceeds the maximum. |
907 | 0 | return Self { |
908 | 0 | inner: self.inner.clone(), |
909 | 0 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
910 | 0 | maybe_parked: false, |
911 | 0 | }; |
912 | | } |
913 | 0 | Err(actual) => curr = actual, |
914 | | } |
915 | | } |
916 | 0 | } |
917 | | } |
918 | | |
919 | | impl<T> Drop for UnboundedSenderInner<T> { |
920 | 0 | fn drop(&mut self) { |
921 | | // Ordering between variables don't matter here |
922 | 0 | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); |
923 | | |
924 | 0 | if prev == 1 { |
925 | 0 | self.close_channel(); |
926 | 0 | } |
927 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<dbus::message::Message> as core::ops::drop::Drop>::drop Unexecuted instantiation: <futures_channel::mpsc::UnboundedSenderInner<_> as core::ops::drop::Drop>::drop |
928 | | } |
929 | | |
930 | | impl<T> Drop for BoundedSenderInner<T> { |
931 | 0 | fn drop(&mut self) { |
932 | | // Ordering between variables don't matter here |
933 | 0 | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); |
934 | | |
935 | 0 | if prev == 1 { |
936 | 0 | self.close_channel(); |
937 | 0 | } |
938 | 0 | } |
939 | | } |
940 | | |
941 | | impl<T> fmt::Debug for Sender<T> { |
942 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
943 | 0 | f.debug_struct("Sender").field("closed", &self.is_closed()).finish() |
944 | 0 | } |
945 | | } |
946 | | |
947 | | impl<T> fmt::Debug for UnboundedSender<T> { |
948 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
949 | 0 | f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish() |
950 | 0 | } |
951 | | } |
952 | | |
953 | | /* |
954 | | * |
955 | | * ===== impl Receiver ===== |
956 | | * |
957 | | */ |
958 | | |
959 | | impl<T> Receiver<T> { |
960 | | /// Closes the receiving half of a channel, without dropping it. |
961 | | /// |
962 | | /// This prevents any further messages from being sent on the channel while |
963 | | /// still enabling the receiver to drain messages that are buffered. |
964 | 0 | pub fn close(&mut self) { |
965 | 0 | if let Some(inner) = &mut self.inner { |
966 | 0 | inner.set_closed(); |
967 | | |
968 | | // Wake up any threads waiting as they'll see that we've closed the |
969 | | // channel and will continue on their merry way. |
970 | 0 | while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
971 | 0 | task.lock().unwrap().notify(); |
972 | 0 | } |
973 | 0 | } |
974 | 0 | } |
975 | | |
976 | | /// Tries to receive the next message without notifying a context if empty. |
977 | | /// |
978 | | /// It is not recommended to call this function from inside of a future, |
979 | | /// only when you've otherwise arranged to be notified when the channel is |
980 | | /// no longer empty. |
981 | | /// |
982 | | /// This function returns: |
983 | | /// * `Ok(Some(t))` when message is fetched |
984 | | /// * `Ok(None)` when channel is closed and no messages left in the queue |
985 | | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
986 | 0 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
987 | 0 | match self.next_message() { |
988 | 0 | Poll::Ready(msg) => Ok(msg), |
989 | 0 | Poll::Pending => Err(TryRecvError { _priv: () }), |
990 | | } |
991 | 0 | } |
992 | | |
993 | 0 | fn next_message(&mut self) -> Poll<Option<T>> { |
994 | 0 | let inner = match self.inner.as_mut() { |
995 | 0 | None => return Poll::Ready(None), |
996 | 0 | Some(inner) => inner, |
997 | | }; |
998 | | // Pop off a message |
999 | 0 | match unsafe { inner.message_queue.pop_spin() } { |
1000 | 0 | Some(msg) => { |
1001 | | // If there are any parked task handles in the parked queue, |
1002 | | // pop one and unpark it. |
1003 | 0 | self.unpark_one(); |
1004 | | |
1005 | | // Decrement number of messages |
1006 | 0 | self.dec_num_messages(); |
1007 | | |
1008 | 0 | Poll::Ready(Some(msg)) |
1009 | | } |
1010 | | None => { |
1011 | 0 | let state = decode_state(inner.state.load(SeqCst)); |
1012 | 0 | if state.is_closed() { |
1013 | | // If closed flag is set AND there are no pending messages |
1014 | | // it means end of stream |
1015 | 0 | self.inner = None; |
1016 | 0 | Poll::Ready(None) |
1017 | | } else { |
1018 | | // If queue is open, we need to return Pending |
1019 | | // to be woken up when new messages arrive. |
1020 | | // If queue is closed but num_messages is non-zero, |
1021 | | // it means that senders updated the state, |
1022 | | // but didn't put message to queue yet, |
1023 | | // so we need to park until sender unparks the task |
1024 | | // after queueing the message. |
1025 | 0 | Poll::Pending |
1026 | | } |
1027 | | } |
1028 | | } |
1029 | 0 | } |
1030 | | |
1031 | | // Unpark a single task handle if there is one pending in the parked queue |
1032 | 0 | fn unpark_one(&mut self) { |
1033 | 0 | if let Some(inner) = &mut self.inner { |
1034 | 0 | if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
1035 | 0 | task.lock().unwrap().notify(); |
1036 | 0 | } |
1037 | 0 | } |
1038 | 0 | } |
1039 | | |
1040 | 0 | fn dec_num_messages(&self) { |
1041 | 0 | if let Some(inner) = &self.inner { |
1042 | 0 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1043 | 0 | // unless there's underflow, and we know there's no underflow |
1044 | 0 | // because number of messages at this point is always > 0. |
1045 | 0 | inner.state.fetch_sub(1, SeqCst); |
1046 | 0 | } |
1047 | 0 | } |
1048 | | } |
1049 | | |
1050 | | // The receiver does not ever take a Pin to the inner T |
1051 | | impl<T> Unpin for Receiver<T> {} |
1052 | | |
1053 | | impl<T> FusedStream for Receiver<T> { |
1054 | 0 | fn is_terminated(&self) -> bool { |
1055 | 0 | self.inner.is_none() |
1056 | 0 | } |
1057 | | } |
1058 | | |
1059 | | impl<T> Stream for Receiver<T> { |
1060 | | type Item = T; |
1061 | | |
1062 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1063 | | // Try to read a message off of the message queue. |
1064 | 0 | match self.next_message() { |
1065 | 0 | Poll::Ready(msg) => { |
1066 | 0 | if msg.is_none() { |
1067 | 0 | self.inner = None; |
1068 | 0 | } |
1069 | 0 | Poll::Ready(msg) |
1070 | | } |
1071 | | Poll::Pending => { |
1072 | | // There are no messages to read, in this case, park. |
1073 | 0 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1074 | | // Check queue again after parking to prevent race condition: |
1075 | | // a message could be added to the queue after previous `next_message` |
1076 | | // before `register` call. |
1077 | 0 | self.next_message() |
1078 | | } |
1079 | | } |
1080 | 0 | } |
1081 | | |
1082 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
1083 | 0 | if let Some(inner) = &self.inner { |
1084 | 0 | decode_state(inner.state.load(SeqCst)).size_hint() |
1085 | | } else { |
1086 | 0 | (0, Some(0)) |
1087 | | } |
1088 | 0 | } |
1089 | | } |
1090 | | |
1091 | | impl<T> Drop for Receiver<T> { |
1092 | 0 | fn drop(&mut self) { |
1093 | | // Drain the channel of all pending messages |
1094 | 0 | self.close(); |
1095 | 0 | if self.inner.is_some() { |
1096 | | loop { |
1097 | 0 | match self.next_message() { |
1098 | 0 | Poll::Ready(Some(_)) => {} |
1099 | 0 | Poll::Ready(None) => break, |
1100 | | Poll::Pending => { |
1101 | 0 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1102 | | |
1103 | | // If the channel is closed, then there is no need to park. |
1104 | 0 | if state.is_closed() { |
1105 | 0 | break; |
1106 | 0 | } |
1107 | | |
1108 | | // TODO: Spinning isn't ideal, it might be worth |
1109 | | // investigating using a condvar or some other strategy |
1110 | | // here. That said, if this case is hit, then another thread |
1111 | | // is about to push the value into the queue and this isn't |
1112 | | // the only spinlock in the impl right now. |
1113 | 0 | thread::yield_now(); |
1114 | | } |
1115 | | } |
1116 | | } |
1117 | 0 | } |
1118 | 0 | } |
1119 | | } |
1120 | | |
1121 | | impl<T> fmt::Debug for Receiver<T> { |
1122 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1123 | 0 | let closed = if let Some(ref inner) = self.inner { |
1124 | 0 | decode_state(inner.state.load(SeqCst)).is_closed() |
1125 | | } else { |
1126 | 0 | false |
1127 | | }; |
1128 | | |
1129 | 0 | f.debug_struct("Receiver").field("closed", &closed).finish() |
1130 | 0 | } |
1131 | | } |
1132 | | |
1133 | | impl<T> UnboundedReceiver<T> { |
1134 | | /// Closes the receiving half of a channel, without dropping it. |
1135 | | /// |
1136 | | /// This prevents any further messages from being sent on the channel while |
1137 | | /// still enabling the receiver to drain messages that are buffered. |
1138 | 0 | pub fn close(&mut self) { |
1139 | 0 | if let Some(inner) = &mut self.inner { |
1140 | 0 | inner.set_closed(); |
1141 | 0 | } |
1142 | 0 | } |
1143 | | |
1144 | | /// Tries to receive the next message without notifying a context if empty. |
1145 | | /// |
1146 | | /// It is not recommended to call this function from inside of a future, |
1147 | | /// only when you've otherwise arranged to be notified when the channel is |
1148 | | /// no longer empty. |
1149 | | /// |
1150 | | /// This function returns: |
1151 | | /// * `Ok(Some(t))` when message is fetched |
1152 | | /// * `Ok(None)` when channel is closed and no messages left in the queue |
1153 | | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
1154 | 0 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1155 | 0 | match self.next_message() { |
1156 | 0 | Poll::Ready(msg) => Ok(msg), |
1157 | 0 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1158 | | } |
1159 | 0 | } |
1160 | | |
1161 | 0 | fn next_message(&mut self) -> Poll<Option<T>> { |
1162 | 0 | let inner = match self.inner.as_mut() { |
1163 | 0 | None => return Poll::Ready(None), |
1164 | 0 | Some(inner) => inner, |
1165 | | }; |
1166 | | // Pop off a message |
1167 | 0 | match unsafe { inner.message_queue.pop_spin() } { |
1168 | 0 | Some(msg) => { |
1169 | | // Decrement number of messages |
1170 | 0 | self.dec_num_messages(); |
1171 | | |
1172 | 0 | Poll::Ready(Some(msg)) |
1173 | | } |
1174 | | None => { |
1175 | 0 | let state = decode_state(inner.state.load(SeqCst)); |
1176 | 0 | if state.is_closed() { |
1177 | | // If closed flag is set AND there are no pending messages |
1178 | | // it means end of stream |
1179 | 0 | self.inner = None; |
1180 | 0 | Poll::Ready(None) |
1181 | | } else { |
1182 | | // If queue is open, we need to return Pending |
1183 | | // to be woken up when new messages arrive. |
1184 | | // If queue is closed but num_messages is non-zero, |
1185 | | // it means that senders updated the state, |
1186 | | // but didn't put message to queue yet, |
1187 | | // so we need to park until sender unparks the task |
1188 | | // after queueing the message. |
1189 | 0 | Poll::Pending |
1190 | | } |
1191 | | } |
1192 | | } |
1193 | 0 | } |
1194 | | |
1195 | 0 | fn dec_num_messages(&self) { |
1196 | 0 | if let Some(inner) = &self.inner { |
1197 | 0 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1198 | 0 | // unless there's underflow, and we know there's no underflow |
1199 | 0 | // because number of messages at this point is always > 0. |
1200 | 0 | inner.state.fetch_sub(1, SeqCst); |
1201 | 0 | } |
1202 | 0 | } |
1203 | | } |
1204 | | |
1205 | | impl<T> FusedStream for UnboundedReceiver<T> { |
1206 | 0 | fn is_terminated(&self) -> bool { |
1207 | 0 | self.inner.is_none() |
1208 | 0 | } |
1209 | | } |
1210 | | |
1211 | | impl<T> Stream for UnboundedReceiver<T> { |
1212 | | type Item = T; |
1213 | | |
1214 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1215 | | // Try to read a message off of the message queue. |
1216 | 0 | match self.next_message() { |
1217 | 0 | Poll::Ready(msg) => { |
1218 | 0 | if msg.is_none() { |
1219 | 0 | self.inner = None; |
1220 | 0 | } |
1221 | 0 | Poll::Ready(msg) |
1222 | | } |
1223 | | Poll::Pending => { |
1224 | | // There are no messages to read, in this case, park. |
1225 | 0 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1226 | | // Check queue again after parking to prevent race condition: |
1227 | | // a message could be added to the queue after previous `next_message` |
1228 | | // before `register` call. |
1229 | 0 | self.next_message() |
1230 | | } |
1231 | | } |
1232 | 0 | } |
1233 | | |
1234 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
1235 | 0 | if let Some(inner) = &self.inner { |
1236 | 0 | decode_state(inner.state.load(SeqCst)).size_hint() |
1237 | | } else { |
1238 | 0 | (0, Some(0)) |
1239 | | } |
1240 | 0 | } |
1241 | | } |
1242 | | |
1243 | | impl<T> Drop for UnboundedReceiver<T> { |
1244 | 0 | fn drop(&mut self) { |
1245 | | // Drain the channel of all pending messages |
1246 | 0 | self.close(); |
1247 | 0 | if self.inner.is_some() { |
1248 | | loop { |
1249 | 0 | match self.next_message() { |
1250 | 0 | Poll::Ready(Some(_)) => {} |
1251 | 0 | Poll::Ready(None) => break, |
1252 | | Poll::Pending => { |
1253 | 0 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1254 | | |
1255 | | // If the channel is closed, then there is no need to park. |
1256 | 0 | if state.is_closed() { |
1257 | 0 | break; |
1258 | 0 | } |
1259 | | |
1260 | | // TODO: Spinning isn't ideal, it might be worth |
1261 | | // investigating using a condvar or some other strategy |
1262 | | // here. That said, if this case is hit, then another thread |
1263 | | // is about to push the value into the queue and this isn't |
1264 | | // the only spinlock in the impl right now. |
1265 | 0 | thread::yield_now(); |
1266 | | } |
1267 | | } |
1268 | | } |
1269 | 0 | } |
1270 | 0 | } |
1271 | | } |
1272 | | |
1273 | | impl<T> fmt::Debug for UnboundedReceiver<T> { |
1274 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1275 | 0 | let closed = if let Some(ref inner) = self.inner { |
1276 | 0 | decode_state(inner.state.load(SeqCst)).is_closed() |
1277 | | } else { |
1278 | 0 | false |
1279 | | }; |
1280 | | |
1281 | 0 | f.debug_struct("Receiver").field("closed", &closed).finish() |
1282 | 0 | } |
1283 | | } |
1284 | | |
1285 | | /* |
1286 | | * |
1287 | | * ===== impl Inner ===== |
1288 | | * |
1289 | | */ |
1290 | | |
1291 | | impl<T> UnboundedInner<T> { |
1292 | | // Clear `open` flag in the state, keep `num_messages` intact. |
1293 | 0 | fn set_closed(&self) { |
1294 | 0 | let curr = self.state.load(SeqCst); |
1295 | 0 | if !decode_state(curr).is_open { |
1296 | 0 | return; |
1297 | 0 | } |
1298 | | |
1299 | 0 | self.state.fetch_and(!OPEN_MASK, SeqCst); |
1300 | 0 | } Unexecuted instantiation: <futures_channel::mpsc::UnboundedInner<dbus::message::Message>>::set_closed Unexecuted instantiation: <futures_channel::mpsc::UnboundedInner<_>>::set_closed |
1301 | | } |
1302 | | |
1303 | | impl<T> BoundedInner<T> { |
1304 | | // The return value is such that the total number of messages that can be |
1305 | | // enqueued into the channel will never exceed MAX_CAPACITY |
1306 | 0 | fn max_senders(&self) -> usize { |
1307 | 0 | MAX_CAPACITY - self.buffer |
1308 | 0 | } |
1309 | | |
1310 | | // Clear `open` flag in the state, keep `num_messages` intact. |
1311 | 0 | fn set_closed(&self) { |
1312 | 0 | let curr = self.state.load(SeqCst); |
1313 | 0 | if !decode_state(curr).is_open { |
1314 | 0 | return; |
1315 | 0 | } |
1316 | | |
1317 | 0 | self.state.fetch_and(!OPEN_MASK, SeqCst); |
1318 | 0 | } |
1319 | | } |
1320 | | |
1321 | | unsafe impl<T: Send> Send for UnboundedInner<T> {} |
1322 | | unsafe impl<T: Send> Sync for UnboundedInner<T> {} |
1323 | | |
1324 | | unsafe impl<T: Send> Send for BoundedInner<T> {} |
1325 | | unsafe impl<T: Send> Sync for BoundedInner<T> {} |
1326 | | |
1327 | | impl State { |
1328 | 0 | fn is_closed(&self) -> bool { |
1329 | 0 | !self.is_open && self.num_messages == 0 |
1330 | 0 | } |
1331 | | |
1332 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
1333 | 0 | if self.is_open { |
1334 | 0 | (self.num_messages, None) |
1335 | | } else { |
1336 | 0 | (self.num_messages, Some(self.num_messages)) |
1337 | | } |
1338 | 0 | } |
1339 | | } |
1340 | | |
1341 | | /* |
1342 | | * |
1343 | | * ===== Helpers ===== |
1344 | | * |
1345 | | */ |
1346 | | |
1347 | 0 | fn decode_state(num: usize) -> State { |
1348 | 0 | State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } |
1349 | 0 | } |
1350 | | |
1351 | 0 | fn encode_state(state: &State) -> usize { |
1352 | 0 | let mut num = state.num_messages; |
1353 | | |
1354 | 0 | if state.is_open { |
1355 | 0 | num |= OPEN_MASK; |
1356 | 0 | } |
1357 | | |
1358 | 0 | num |
1359 | 0 | } |