Coverage Report

Created: 2026-01-17 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/sync/oneshot.rs
Line
Count
Source
1
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3
//! A one-shot channel is used for sending a single message between
4
//! asynchronous tasks. The [`channel`] function is used to create a
5
//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6
//!
7
//! The `Sender` handle is used by the producer to send the value.
8
//! The `Receiver` handle is used by the consumer to receive the value.
9
//!
10
//! Each handle can be used on separate tasks.
11
//!
12
//! Since the `send` method is not async, it can be used anywhere. This includes
13
//! sending between two runtimes, and using it from non-async code.
14
//!
15
//! If the [`Receiver`] is closed before receiving a message which has already
16
//! been sent, the message will remain in the channel until the receiver is
17
//! dropped, at which point the message will be dropped immediately.
18
//!
19
//! # Examples
20
//!
21
//! ```
22
//! use tokio::sync::oneshot;
23
//!
24
//! # #[tokio::main(flavor = "current_thread")]
25
//! # async fn main() {
26
//! let (tx, rx) = oneshot::channel();
27
//!
28
//! tokio::spawn(async move {
29
//!     if let Err(_) = tx.send(3) {
30
//!         println!("the receiver dropped");
31
//!     }
32
//! });
33
//!
34
//! match rx.await {
35
//!     Ok(v) => println!("got = {:?}", v),
36
//!     Err(_) => println!("the sender dropped"),
37
//! }
38
//! # }
39
//! ```
40
//!
41
//! If the sender is dropped without sending, the receiver will fail with
42
//! [`error::RecvError`]:
43
//!
44
//! ```
45
//! use tokio::sync::oneshot;
46
//!
47
//! # #[tokio::main(flavor = "current_thread")]
48
//! # async fn main() {
49
//! let (tx, rx) = oneshot::channel::<u32>();
50
//!
51
//! tokio::spawn(async move {
52
//!     drop(tx);
53
//! });
54
//!
55
//! match rx.await {
56
//!     Ok(_) => panic!("This doesn't happen"),
57
//!     Err(_) => println!("the sender dropped"),
58
//! }
59
//! # }
60
//! ```
61
//!
62
//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63
//! the channel.
64
//!
65
//! ```
66
//! use tokio::sync::oneshot;
67
//! use tokio::time::{interval, sleep, Duration};
68
//!
69
//! # #[tokio::main(flavor = "current_thread")]
70
//! # async fn _doc() {}
71
//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72
//! # async fn main() {
73
//! let (send, mut recv) = oneshot::channel();
74
//! let mut interval = interval(Duration::from_millis(100));
75
//!
76
//! # let handle =
77
//! tokio::spawn(async move {
78
//!     sleep(Duration::from_secs(1)).await;
79
//!     send.send("shut down").unwrap();
80
//! });
81
//!
82
//! loop {
83
//!     tokio::select! {
84
//!         _ = interval.tick() => println!("Another 100ms"),
85
//!         msg = &mut recv => {
86
//!             println!("Got message: {}", msg.unwrap());
87
//!             break;
88
//!         }
89
//!     }
90
//! }
91
//! # handle.await.unwrap();
92
//! # }
93
//! ```
94
//!
95
//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96
//! [`Option::take`].
97
//!
98
//! ```
99
//! use tokio::sync::oneshot;
100
//!
101
//! struct SendOnDrop {
102
//!     sender: Option<oneshot::Sender<&'static str>>,
103
//! }
104
//! impl Drop for SendOnDrop {
105
//!     fn drop(&mut self) {
106
//!         if let Some(sender) = self.sender.take() {
107
//!             // Using `let _ =` to ignore send errors.
108
//!             let _ = sender.send("I got dropped!");
109
//!         }
110
//!     }
111
//! }
112
//!
113
//! # #[tokio::main(flavor = "current_thread")]
114
//! # async fn _doc() {}
115
//! # #[tokio::main(flavor = "current_thread")]
116
//! # async fn main() {
117
//! let (send, recv) = oneshot::channel();
118
//!
119
//! let send_on_drop = SendOnDrop { sender: Some(send) };
120
//! drop(send_on_drop);
121
//!
122
//! assert_eq!(recv.await, Ok("I got dropped!"));
123
//! # }
124
//! ```
125
126
use crate::loom::cell::UnsafeCell;
127
use crate::loom::sync::atomic::AtomicUsize;
128
use crate::loom::sync::Arc;
129
#[cfg(all(tokio_unstable, feature = "tracing"))]
130
use crate::util::trace;
131
132
use std::fmt;
133
use std::future::Future;
134
use std::mem::MaybeUninit;
135
use std::pin::Pin;
136
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137
use std::task::Poll::{Pending, Ready};
138
use std::task::{ready, Context, Poll, Waker};
139
140
/// Sends a value to the associated [`Receiver`].
141
///
142
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143
/// [`channel`](fn@channel) function.
144
///
145
/// # Examples
146
///
147
/// ```
148
/// use tokio::sync::oneshot;
149
///
150
/// # #[tokio::main(flavor = "current_thread")]
151
/// # async fn main() {
152
/// let (tx, rx) = oneshot::channel();
153
///
154
/// tokio::spawn(async move {
155
///     if let Err(_) = tx.send(3) {
156
///         println!("the receiver dropped");
157
///     }
158
/// });
159
///
160
/// match rx.await {
161
///     Ok(v) => println!("got = {:?}", v),
162
///     Err(_) => println!("the sender dropped"),
163
/// }
164
/// # }
165
/// ```
166
///
167
/// If the sender is dropped without sending, the receiver will fail with
168
/// [`error::RecvError`]:
169
///
170
/// ```
171
/// use tokio::sync::oneshot;
172
///
173
/// # #[tokio::main(flavor = "current_thread")]
174
/// # async fn main() {
175
/// let (tx, rx) = oneshot::channel::<u32>();
176
///
177
/// tokio::spawn(async move {
178
///     drop(tx);
179
/// });
180
///
181
/// match rx.await {
182
///     Ok(_) => panic!("This doesn't happen"),
183
///     Err(_) => println!("the sender dropped"),
184
/// }
185
/// # }
186
/// ```
187
///
188
/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189
/// [`Option::take`].
190
///
191
/// ```
192
/// use tokio::sync::oneshot;
193
///
194
/// struct SendOnDrop {
195
///     sender: Option<oneshot::Sender<&'static str>>,
196
/// }
197
/// impl Drop for SendOnDrop {
198
///     fn drop(&mut self) {
199
///         if let Some(sender) = self.sender.take() {
200
///             // Using `let _ =` to ignore send errors.
201
///             let _ = sender.send("I got dropped!");
202
///         }
203
///     }
204
/// }
205
///
206
/// # #[tokio::main(flavor = "current_thread")]
207
/// # async fn _doc() {}
208
/// # #[tokio::main(flavor = "current_thread")]
209
/// # async fn main() {
210
/// let (send, recv) = oneshot::channel();
211
///
212
/// let send_on_drop = SendOnDrop { sender: Some(send) };
213
/// drop(send_on_drop);
214
///
215
/// assert_eq!(recv.await, Ok("I got dropped!"));
216
/// # }
217
/// ```
218
///
219
/// [`Option`]: std::option::Option
220
/// [`Option::take`]: std::option::Option::take
221
#[derive(Debug)]
222
pub struct Sender<T> {
223
    inner: Option<Arc<Inner<T>>>,
224
    #[cfg(all(tokio_unstable, feature = "tracing"))]
225
    resource_span: tracing::Span,
226
}
227
228
/// Receives a value from the associated [`Sender`].
229
///
230
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231
/// [`channel`](fn@channel) function.
232
///
233
/// This channel has no `recv` method because the receiver itself implements the
234
/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235
///
236
/// The `poll` method on the `Future` trait is allowed to spuriously return
237
/// `Poll::Pending` even if the message has been sent. If such a spurious
238
/// failure happens, then the caller will be woken when the spurious failure has
239
/// been resolved so that the caller can attempt to receive the message again.
240
/// Note that receiving such a wakeup does not guarantee that the next call will
241
/// succeed — it could fail with another spurious failure. (A spurious failure
242
/// does not mean that the message is lost. It is just delayed.)
243
///
244
/// [`Future`]: trait@std::future::Future
245
///
246
/// # Cancellation safety
247
///
248
/// The `Receiver` is cancel safe. If it is used as the event in a
249
/// [`tokio::select!`](crate::select) statement and some other branch
250
/// completes first, it is guaranteed that no message was received on this
251
/// channel.
252
///
253
/// # Examples
254
///
255
/// ```
256
/// use tokio::sync::oneshot;
257
///
258
/// # #[tokio::main(flavor = "current_thread")]
259
/// # async fn main() {
260
/// let (tx, rx) = oneshot::channel();
261
///
262
/// tokio::spawn(async move {
263
///     if let Err(_) = tx.send(3) {
264
///         println!("the receiver dropped");
265
///     }
266
/// });
267
///
268
/// match rx.await {
269
///     Ok(v) => println!("got = {:?}", v),
270
///     Err(_) => println!("the sender dropped"),
271
/// }
272
/// # }
273
/// ```
274
///
275
/// If the sender is dropped without sending, the receiver will fail with
276
/// [`error::RecvError`]:
277
///
278
/// ```
279
/// use tokio::sync::oneshot;
280
///
281
/// # #[tokio::main(flavor = "current_thread")]
282
/// # async fn main() {
283
/// let (tx, rx) = oneshot::channel::<u32>();
284
///
285
/// tokio::spawn(async move {
286
///     drop(tx);
287
/// });
288
///
289
/// match rx.await {
290
///     Ok(_) => panic!("This doesn't happen"),
291
///     Err(_) => println!("the sender dropped"),
292
/// }
293
/// # }
294
/// ```
295
///
296
/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
297
/// channel.
298
///
299
/// ```
300
/// use tokio::sync::oneshot;
301
/// use tokio::time::{interval, sleep, Duration};
302
///
303
/// # #[tokio::main(flavor = "current_thread")]
304
/// # async fn _doc() {}
305
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
306
/// # async fn main() {
307
/// let (send, mut recv) = oneshot::channel();
308
/// let mut interval = interval(Duration::from_millis(100));
309
///
310
/// # let handle =
311
/// tokio::spawn(async move {
312
///     sleep(Duration::from_secs(1)).await;
313
///     send.send("shut down").unwrap();
314
/// });
315
///
316
/// loop {
317
///     tokio::select! {
318
///         _ = interval.tick() => println!("Another 100ms"),
319
///         msg = &mut recv => {
320
///             println!("Got message: {}", msg.unwrap());
321
///             break;
322
///         }
323
///     }
324
/// }
325
/// # handle.await.unwrap();
326
/// # }
327
/// ```
328
#[derive(Debug)]
329
pub struct Receiver<T> {
330
    inner: Option<Arc<Inner<T>>>,
331
    #[cfg(all(tokio_unstable, feature = "tracing"))]
332
    resource_span: tracing::Span,
333
    #[cfg(all(tokio_unstable, feature = "tracing"))]
334
    async_op_span: tracing::Span,
335
    #[cfg(all(tokio_unstable, feature = "tracing"))]
336
    async_op_poll_span: tracing::Span,
337
}
338
339
pub mod error {
340
    //! `Oneshot` error types.
341
342
    use std::fmt;
343
344
    /// Error returned by the `Future` implementation for `Receiver`.
345
    ///
346
    /// This error is returned by the receiver when the sender is dropped without sending.
347
    #[derive(Debug, Eq, PartialEq, Clone)]
348
    pub struct RecvError(pub(super) ());
349
350
    /// Error returned by the `try_recv` function on `Receiver`.
351
    #[derive(Debug, Eq, PartialEq, Clone)]
352
    pub enum TryRecvError {
353
        /// The send half of the channel has not yet sent a value.
354
        Empty,
355
356
        /// The send half of the channel was dropped without sending a value.
357
        Closed,
358
    }
359
360
    // ===== impl RecvError =====
361
362
    impl fmt::Display for RecvError {
363
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
364
0
            write!(fmt, "channel closed")
365
0
        }
366
    }
367
368
    impl std::error::Error for RecvError {}
369
370
    // ===== impl TryRecvError =====
371
372
    impl fmt::Display for TryRecvError {
373
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
374
0
            match self {
375
0
                TryRecvError::Empty => write!(fmt, "channel empty"),
376
0
                TryRecvError::Closed => write!(fmt, "channel closed"),
377
            }
378
0
        }
379
    }
380
381
    impl std::error::Error for TryRecvError {}
382
}
383
384
use self::error::*;
385
386
struct Inner<T> {
387
    /// Manages the state of the inner cell.
388
    state: AtomicUsize,
389
390
    /// The value. This is set by `Sender` and read by `Receiver`. The state of
391
    /// the cell is tracked by `state`.
392
    value: UnsafeCell<Option<T>>,
393
394
    /// The task to notify when the receiver drops without consuming the value.
395
    ///
396
    /// ## Safety
397
    ///
398
    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
399
    /// initialized. If that bit is unset, this field may be uninitialized.
400
    tx_task: Task,
401
402
    /// The task to notify when the value is sent.
403
    ///
404
    /// ## Safety
405
    ///
406
    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
407
    /// initialized. If that bit is unset, this field may be uninitialized.
408
    rx_task: Task,
409
}
410
411
struct Task(UnsafeCell<MaybeUninit<Waker>>);
412
413
impl Task {
414
    /// # Safety
415
    ///
416
    /// The caller must do the necessary synchronization to ensure that
417
    /// the [`Self::0`] contains the valid [`Waker`] during the call.
418
3.29k
    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
419
3.29k
        unsafe { self.with_task(|w| w.will_wake(cx.waker())) }
420
3.29k
    }
421
422
    /// # Safety
423
    ///
424
    /// The caller must do the necessary synchronization to ensure that
425
    /// the [`Self::0`] contains the valid [`Waker`] during the call.
426
20.7k
    unsafe fn with_task<F, R>(&self, f: F) -> R
427
20.7k
    where
428
20.7k
        F: FnOnce(&Waker) -> R,
429
    {
430
20.7k
        self.0.with(|ptr| {
431
20.7k
            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432
20.7k
            f(unsafe { &*waker })
433
20.7k
        })
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>::{closure#0}
Line
Count
Source
430
3.29k
        self.0.with(|ptr| {
431
3.29k
            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432
3.29k
            f(unsafe { &*waker })
433
3.29k
        })
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>::{closure#0}
Line
Count
Source
430
17.4k
        self.0.with(|ptr| {
431
17.4k
            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432
17.4k
            f(unsafe { &*waker })
433
17.4k
        })
434
20.7k
    }
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>
Line
Count
Source
426
3.29k
    unsafe fn with_task<F, R>(&self, f: F) -> R
427
3.29k
    where
428
3.29k
        F: FnOnce(&Waker) -> R,
429
    {
430
3.29k
        self.0.with(|ptr| {
431
            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432
            f(unsafe { &*waker })
433
        })
434
3.29k
    }
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>
Line
Count
Source
426
17.4k
    unsafe fn with_task<F, R>(&self, f: F) -> R
427
17.4k
    where
428
17.4k
        F: FnOnce(&Waker) -> R,
429
    {
430
17.4k
        self.0.with(|ptr| {
431
            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432
            f(unsafe { &*waker })
433
        })
434
17.4k
    }
435
436
    /// # Safety
437
    ///
438
    /// The caller must do the necessary synchronization to ensure that
439
    /// the [`Self::0`] contains the valid [`Waker`] during the call.
440
17.4k
    unsafe fn drop_task(&self) {
441
17.4k
        self.0.with_mut(|ptr| {
442
17.4k
            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
443
17.4k
            unsafe {
444
17.4k
                ptr.drop_in_place();
445
17.4k
            }
446
17.4k
        });
447
17.4k
    }
448
449
    /// # Safety
450
    ///
451
    /// The caller must do the necessary synchronization to ensure that
452
    /// the [`Self::0`] contains the valid [`Waker`] during the call.
453
17.4k
    unsafe fn set_task(&self, cx: &mut Context<'_>) {
454
17.4k
        self.0.with_mut(|ptr| {
455
17.4k
            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
456
17.4k
            unsafe {
457
17.4k
                ptr.write(cx.waker().clone());
458
17.4k
            }
459
17.4k
        });
460
17.4k
    }
461
}
462
463
#[derive(Clone, Copy)]
464
struct State(usize);
465
466
/// Creates a new one-shot channel for sending single values across asynchronous
467
/// tasks.
468
///
469
/// The function returns separate "send" and "receive" handles. The `Sender`
470
/// handle is used by the producer to send the value. The `Receiver` handle is
471
/// used by the consumer to receive the value.
472
///
473
/// Each handle can be used on separate tasks.
474
///
475
/// # Examples
476
///
477
/// ```
478
/// use tokio::sync::oneshot;
479
///
480
/// # #[tokio::main(flavor = "current_thread")]
481
/// # async fn main() {
482
/// let (tx, rx) = oneshot::channel();
483
///
484
/// tokio::spawn(async move {
485
///     if let Err(_) = tx.send(3) {
486
///         println!("the receiver dropped");
487
///     }
488
/// });
489
///
490
/// match rx.await {
491
///     Ok(v) => println!("got = {:?}", v),
492
///     Err(_) => println!("the sender dropped"),
493
/// }
494
/// # }
495
/// ```
496
#[track_caller]
497
17.4k
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
498
    #[cfg(all(tokio_unstable, feature = "tracing"))]
499
    let resource_span = {
500
        let location = std::panic::Location::caller();
501
502
        let resource_span = tracing::trace_span!(
503
            parent: None,
504
            "runtime.resource",
505
            concrete_type = "Sender|Receiver",
506
            kind = "Sync",
507
            loc.file = location.file(),
508
            loc.line = location.line(),
509
            loc.col = location.column(),
510
        );
511
512
        resource_span.in_scope(|| {
513
            tracing::trace!(
514
            target: "runtime::resource::state_update",
515
            tx_dropped = false,
516
            tx_dropped.op = "override",
517
            )
518
        });
519
520
        resource_span.in_scope(|| {
521
            tracing::trace!(
522
            target: "runtime::resource::state_update",
523
            rx_dropped = false,
524
            rx_dropped.op = "override",
525
            )
526
        });
527
528
        resource_span.in_scope(|| {
529
            tracing::trace!(
530
            target: "runtime::resource::state_update",
531
            value_sent = false,
532
            value_sent.op = "override",
533
            )
534
        });
535
536
        resource_span.in_scope(|| {
537
            tracing::trace!(
538
            target: "runtime::resource::state_update",
539
            value_received = false,
540
            value_received.op = "override",
541
            )
542
        });
543
544
        resource_span
545
    };
546
547
17.4k
    let inner = Arc::new(Inner {
548
17.4k
        state: AtomicUsize::new(State::new().as_usize()),
549
17.4k
        value: UnsafeCell::new(None),
550
17.4k
        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
551
17.4k
        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
552
17.4k
    });
553
554
17.4k
    let tx = Sender {
555
17.4k
        inner: Some(inner.clone()),
556
17.4k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
557
17.4k
        resource_span: resource_span.clone(),
558
17.4k
    };
559
560
    #[cfg(all(tokio_unstable, feature = "tracing"))]
561
    let async_op_span = resource_span
562
        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
563
564
    #[cfg(all(tokio_unstable, feature = "tracing"))]
565
    let async_op_poll_span =
566
        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
567
568
17.4k
    let rx = Receiver {
569
17.4k
        inner: Some(inner),
570
17.4k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
571
17.4k
        resource_span,
572
17.4k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
573
17.4k
        async_op_span,
574
17.4k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
575
17.4k
        async_op_poll_span,
576
17.4k
    };
577
578
17.4k
    (tx, rx)
579
17.4k
}
580
581
impl<T> Sender<T> {
582
    /// Attempts to send a value on this channel, returning it back if it could
583
    /// not be sent.
584
    ///
585
    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
586
    /// channel. It is not marked async because sending a message to an `oneshot`
587
    /// channel never requires any form of waiting.  Because of this, the `send`
588
    /// method can be used in both synchronous and asynchronous code without
589
    /// problems.
590
    ///
591
    /// A successful send occurs when it is determined that the other end of the
592
    /// channel has not hung up already. An unsuccessful send would be one where
593
    /// the corresponding receiver has already been deallocated. Note that a
594
    /// return value of `Err` means that the data will never be received, but
595
    /// a return value of `Ok` does *not* mean that the data will be received.
596
    /// It is possible for the corresponding receiver to hang up immediately
597
    /// after this function returns `Ok`.
598
    ///
599
    /// # Examples
600
    ///
601
    /// Send a value to another task
602
    ///
603
    /// ```
604
    /// use tokio::sync::oneshot;
605
    ///
606
    /// # #[tokio::main(flavor = "current_thread")]
607
    /// # async fn main() {
608
    /// let (tx, rx) = oneshot::channel();
609
    ///
610
    /// tokio::spawn(async move {
611
    ///     if let Err(_) = tx.send(3) {
612
    ///         println!("the receiver dropped");
613
    ///     }
614
    /// });
615
    ///
616
    /// match rx.await {
617
    ///     Ok(v) => println!("got = {:?}", v),
618
    ///     Err(_) => println!("the sender dropped"),
619
    /// }
620
    /// # }
621
    /// ```
622
0
    pub fn send(mut self, t: T) -> Result<(), T> {
623
0
        let inner = self.inner.take().unwrap();
624
625
0
        inner.value.with_mut(|ptr| unsafe {
626
            // SAFETY: The receiver will not access the `UnsafeCell` unless the
627
            // channel has been marked as "complete" (the `VALUE_SENT` state bit
628
            // is set).
629
            // That bit is only set by the sender later on in this method, and
630
            // calling this method consumes `self`. Therefore, if it was possible to
631
            // call this method, we know that the `VALUE_SENT` bit is unset, and
632
            // the receiver is not currently accessing the `UnsafeCell`.
633
0
            *ptr = Some(t);
634
0
        });
635
636
0
        if !inner.complete() {
637
            unsafe {
638
                // SAFETY: The receiver will not access the `UnsafeCell` unless
639
                // the channel has been marked as "complete". Calling
640
                // `complete()` will return true if this bit is set, and false
641
                // if it is not set. Thus, if `complete()` returned false, it is
642
                // safe for us to access the value, because we know that the
643
                // receiver will not.
644
0
                return Err(inner.consume_value().unwrap());
645
            }
646
0
        }
647
648
        #[cfg(all(tokio_unstable, feature = "tracing"))]
649
        self.resource_span.in_scope(|| {
650
            tracing::trace!(
651
            target: "runtime::resource::state_update",
652
            value_sent = true,
653
            value_sent.op = "override",
654
            )
655
        });
656
657
0
        Ok(())
658
0
    }
659
660
    /// Waits for the associated [`Receiver`] handle to close.
661
    ///
662
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
663
    /// [`Receiver`] value is dropped.
664
    ///
665
    /// This function is useful when paired with `select!` to abort a
666
    /// computation when the receiver is no longer interested in the result.
667
    ///
668
    /// # Return
669
    ///
670
    /// Returns a `Future` which must be awaited on.
671
    ///
672
    /// [`Receiver`]: Receiver
673
    /// [`close`]: Receiver::close
674
    ///
675
    /// # Examples
676
    ///
677
    /// Basic usage
678
    ///
679
    /// ```
680
    /// use tokio::sync::oneshot;
681
    ///
682
    /// # #[tokio::main(flavor = "current_thread")]
683
    /// # async fn main() {
684
    /// let (mut tx, rx) = oneshot::channel::<()>();
685
    ///
686
    /// tokio::spawn(async move {
687
    ///     drop(rx);
688
    /// });
689
    ///
690
    /// tx.closed().await;
691
    /// println!("the receiver dropped");
692
    /// # }
693
    /// ```
694
    ///
695
    /// Paired with select
696
    ///
697
    /// ```
698
    /// use tokio::sync::oneshot;
699
    /// use tokio::time::{self, Duration};
700
    ///
701
    /// async fn compute() -> String {
702
    ///     // Complex computation returning a `String`
703
    /// # "hello".to_string()
704
    /// }
705
    ///
706
    /// # #[tokio::main(flavor = "current_thread")]
707
    /// # async fn main() {
708
    /// let (mut tx, rx) = oneshot::channel();
709
    ///
710
    /// tokio::spawn(async move {
711
    ///     tokio::select! {
712
    ///         _ = tx.closed() => {
713
    ///             // The receiver dropped, no need to do any further work
714
    ///         }
715
    ///         value = compute() => {
716
    ///             // The send can fail if the channel was closed at the exact same
717
    ///             // time as when compute() finished, so just ignore the failure.
718
    ///             let _ = tx.send(value);
719
    ///         }
720
    ///     }
721
    /// });
722
    ///
723
    /// // Wait for up to 10 seconds
724
    /// let _ = time::timeout(Duration::from_secs(10), rx).await;
725
    /// # }
726
    /// ```
727
0
    pub async fn closed(&mut self) {
728
        use std::future::poll_fn;
729
730
        #[cfg(all(tokio_unstable, feature = "tracing"))]
731
        let resource_span = self.resource_span.clone();
732
        #[cfg(all(tokio_unstable, feature = "tracing"))]
733
        let closed = trace::async_op(
734
            || poll_fn(|cx| self.poll_closed(cx)),
735
            resource_span,
736
            "Sender::closed",
737
            "poll_closed",
738
            false,
739
        );
740
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
741
0
        let closed = poll_fn(|cx| self.poll_closed(cx));
742
743
0
        closed.await;
744
0
    }
745
746
    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
747
    ///
748
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
749
    /// [`Receiver`] value is dropped.
750
    ///
751
    /// If `true` is returned, a call to `send` will always result in an error.
752
    ///
753
    /// [`Receiver`]: Receiver
754
    /// [`close`]: Receiver::close
755
    ///
756
    /// # Examples
757
    ///
758
    /// ```
759
    /// use tokio::sync::oneshot;
760
    ///
761
    /// # #[tokio::main(flavor = "current_thread")]
762
    /// # async fn main() {
763
    /// let (tx, rx) = oneshot::channel();
764
    ///
765
    /// assert!(!tx.is_closed());
766
    ///
767
    /// drop(rx);
768
    ///
769
    /// assert!(tx.is_closed());
770
    /// assert!(tx.send("never received").is_err());
771
    /// # }
772
    /// ```
773
0
    pub fn is_closed(&self) -> bool {
774
0
        let inner = self.inner.as_ref().unwrap();
775
776
0
        let state = State::load(&inner.state, Acquire);
777
0
        state.is_closed()
778
0
    }
779
780
    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
781
    /// `Waker` in the provided `Context` to receive a notification when the channel is
782
    /// closed.
783
    ///
784
    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
785
    /// [`Receiver`] value is dropped.
786
    ///
787
    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
788
    /// to the most recent call will be scheduled to receive a wakeup.
789
    ///
790
    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
791
    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
792
    ///
793
    /// # Return value
794
    ///
795
    /// This function returns:
796
    ///
797
    ///  * `Poll::Pending` if the channel is still open.
798
    ///  * `Poll::Ready(())` if the channel is closed.
799
    ///
800
    /// # Examples
801
    ///
802
    /// ```
803
    /// use tokio::sync::oneshot;
804
    ///
805
    /// use std::future::poll_fn;
806
    ///
807
    /// # #[tokio::main(flavor = "current_thread")]
808
    /// # async fn main() {
809
    /// let (mut tx, mut rx) = oneshot::channel::<()>();
810
    ///
811
    /// tokio::spawn(async move {
812
    ///     rx.close();
813
    /// });
814
    ///
815
    /// poll_fn(|cx| tx.poll_closed(cx)).await;
816
    ///
817
    /// println!("the receiver dropped");
818
    /// # }
819
    /// ```
820
0
    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
821
0
        ready!(crate::trace::trace_leaf(cx));
822
823
        // Keep track of task budget
824
0
        let coop = ready!(crate::task::coop::poll_proceed(cx));
825
826
0
        let inner = self.inner.as_ref().unwrap();
827
828
0
        let mut state = State::load(&inner.state, Acquire);
829
830
0
        if state.is_closed() {
831
0
            coop.made_progress();
832
0
            return Ready(());
833
0
        }
834
835
0
        if state.is_tx_task_set() {
836
0
            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
837
838
0
            if !will_notify {
839
0
                state = State::unset_tx_task(&inner.state);
840
841
0
                if state.is_closed() {
842
                    // Set the flag again so that the waker is released in drop
843
0
                    State::set_tx_task(&inner.state);
844
0
                    coop.made_progress();
845
0
                    return Ready(());
846
0
                } else {
847
0
                    unsafe { inner.tx_task.drop_task() };
848
0
                }
849
0
            }
850
0
        }
851
852
0
        if !state.is_tx_task_set() {
853
            // Attempt to set the task
854
0
            unsafe {
855
0
                inner.tx_task.set_task(cx);
856
0
            }
857
858
            // Update the state
859
0
            state = State::set_tx_task(&inner.state);
860
861
0
            if state.is_closed() {
862
0
                coop.made_progress();
863
0
                return Ready(());
864
0
            }
865
0
        }
866
867
0
        Pending
868
0
    }
869
}
870
871
impl<T> Drop for Sender<T> {
872
17.4k
    fn drop(&mut self) {
873
17.4k
        if let Some(inner) = self.inner.as_ref() {
874
17.4k
            inner.complete();
875
17.4k
            #[cfg(all(tokio_unstable, feature = "tracing"))]
876
17.4k
            self.resource_span.in_scope(|| {
877
17.4k
                tracing::trace!(
878
17.4k
                target: "runtime::resource::state_update",
879
17.4k
                tx_dropped = true,
880
17.4k
                tx_dropped.op = "override",
881
17.4k
                )
882
17.4k
            });
883
17.4k
        }
884
17.4k
    }
885
}
886
887
impl<T> Receiver<T> {
888
    /// Prevents the associated [`Sender`] handle from sending a value.
889
    ///
890
    /// Any `send` operation which happens after calling `close` is guaranteed
891
    /// to fail. After calling `close`, [`try_recv`] should be called to
892
    /// receive a value if one was sent **before** the call to `close`
893
    /// completed.
894
    ///
895
    /// This function is useful to perform a graceful shutdown and ensure that a
896
    /// value will not be sent into the channel and never received.
897
    ///
898
    /// `close` is no-op if a message is already received or the channel
899
    /// is already closed.
900
    ///
901
    /// [`Sender`]: Sender
902
    /// [`try_recv`]: Receiver::try_recv
903
    ///
904
    /// # Examples
905
    ///
906
    /// Prevent a value from being sent
907
    ///
908
    /// ```
909
    /// use tokio::sync::oneshot;
910
    /// use tokio::sync::oneshot::error::TryRecvError;
911
    ///
912
    /// # #[tokio::main(flavor = "current_thread")]
913
    /// # async fn main() {
914
    /// let (tx, mut rx) = oneshot::channel();
915
    ///
916
    /// assert!(!tx.is_closed());
917
    ///
918
    /// rx.close();
919
    ///
920
    /// assert!(tx.is_closed());
921
    /// assert!(tx.send("never received").is_err());
922
    ///
923
    /// match rx.try_recv() {
924
    ///     Err(TryRecvError::Closed) => {}
925
    ///     _ => unreachable!(),
926
    /// }
927
    /// # }
928
    /// ```
929
    ///
930
    /// Receive a value sent **before** calling `close`
931
    ///
932
    /// ```
933
    /// use tokio::sync::oneshot;
934
    ///
935
    /// # #[tokio::main(flavor = "current_thread")]
936
    /// # async fn main() {
937
    /// let (tx, mut rx) = oneshot::channel();
938
    ///
939
    /// assert!(tx.send("will receive").is_ok());
940
    ///
941
    /// rx.close();
942
    ///
943
    /// let msg = rx.try_recv().unwrap();
944
    /// assert_eq!(msg, "will receive");
945
    /// # }
946
    /// ```
947
0
    pub fn close(&mut self) {
948
0
        if let Some(inner) = self.inner.as_ref() {
949
0
            inner.close();
950
0
            #[cfg(all(tokio_unstable, feature = "tracing"))]
951
0
            self.resource_span.in_scope(|| {
952
0
                tracing::trace!(
953
0
                target: "runtime::resource::state_update",
954
0
                rx_dropped = true,
955
0
                rx_dropped.op = "override",
956
0
                )
957
0
            });
958
0
        }
959
0
    }
960
961
    /// Checks if this receiver is terminated.
962
    ///
963
    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
964
    /// If so, this receiver should no longer be polled.
965
    ///
966
    /// # Examples
967
    ///
968
    /// Sending a value and polling it.
969
    ///
970
    /// ```
971
    /// use tokio::sync::oneshot;
972
    ///
973
    /// use std::task::Poll;
974
    ///
975
    /// # #[tokio::main(flavor = "current_thread")]
976
    /// # async fn main() {
977
    /// let (tx, mut rx) = oneshot::channel();
978
    ///
979
    /// // A receiver is not terminated when it is initialized.
980
    /// assert!(!rx.is_terminated());
981
    ///
982
    /// // A receiver is not terminated it is polled and is still pending.
983
    /// let poll = futures::poll!(&mut rx);
984
    /// assert_eq!(poll, Poll::Pending);
985
    /// assert!(!rx.is_terminated());
986
    ///
987
    /// // A receiver is not terminated if a value has been sent, but not yet read.
988
    /// tx.send(0).unwrap();
989
    /// assert!(!rx.is_terminated());
990
    ///
991
    /// // A receiver *is* terminated after it has been polled and yielded a value.
992
    /// assert_eq!((&mut rx).await, Ok(0));
993
    /// assert!(rx.is_terminated());
994
    /// # }
995
    /// ```
996
    ///
997
    /// Dropping the sender.
998
    ///
999
    /// ```
1000
    /// use tokio::sync::oneshot;
1001
    ///
1002
    /// # #[tokio::main(flavor = "current_thread")]
1003
    /// # async fn main() {
1004
    /// let (tx, mut rx) = oneshot::channel::<()>();
1005
    ///
1006
    /// // A receiver is not immediately terminated when the sender is dropped.
1007
    /// drop(tx);
1008
    /// assert!(!rx.is_terminated());
1009
    ///
1010
    /// // A receiver *is* terminated after it has been polled and yielded an error.
1011
    /// let _ = (&mut rx).await.unwrap_err();
1012
    /// assert!(rx.is_terminated());
1013
    /// # }
1014
    /// ```
1015
0
    pub fn is_terminated(&self) -> bool {
1016
0
        self.inner.is_none()
1017
0
    }
1018
1019
    /// Checks if a channel is empty.
1020
    ///
1021
    /// This method returns `true` if the channel has no messages.
1022
    ///
1023
    /// It is not necessarily safe to poll an empty receiver, which may have
1024
    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
1025
    /// to check whether or not a receiver can be safely polled, instead.
1026
    ///
1027
    /// # Examples
1028
    ///
1029
    /// Sending a value.
1030
    ///
1031
    /// ```
1032
    /// use tokio::sync::oneshot;
1033
    ///
1034
    /// # #[tokio::main(flavor = "current_thread")]
1035
    /// # async fn main() {
1036
    /// let (tx, mut rx) = oneshot::channel();
1037
    /// assert!(rx.is_empty());
1038
    ///
1039
    /// tx.send(0).unwrap();
1040
    /// assert!(!rx.is_empty());
1041
    ///
1042
    /// let _ = (&mut rx).await;
1043
    /// assert!(rx.is_empty());
1044
    /// # }
1045
    /// ```
1046
    ///
1047
    /// Dropping the sender.
1048
    ///
1049
    /// ```
1050
    /// use tokio::sync::oneshot;
1051
    ///
1052
    /// # #[tokio::main(flavor = "current_thread")]
1053
    /// # async fn main() {
1054
    /// let (tx, mut rx) = oneshot::channel::<()>();
1055
    ///
1056
    /// // A channel is empty if the sender is dropped.
1057
    /// drop(tx);
1058
    /// assert!(rx.is_empty());
1059
    ///
1060
    /// // A closed channel still yields an error, however.
1061
    /// (&mut rx).await.expect_err("should yield an error");
1062
    /// assert!(rx.is_empty());
1063
    /// # }
1064
    /// ```
1065
    ///
1066
    /// Terminated channels are empty.
1067
    ///
1068
    /// ```should_panic,ignore-wasm
1069
    /// use tokio::sync::oneshot;
1070
    ///
1071
    /// #[tokio::main]
1072
    /// async fn main() {
1073
    ///     let (tx, mut rx) = oneshot::channel();
1074
    ///     tx.send(0).unwrap();
1075
    ///     let _ = (&mut rx).await;
1076
    ///
1077
    ///     // NB: an empty channel is not necessarily safe to poll!
1078
    ///     assert!(rx.is_empty());
1079
    ///     let _ = (&mut rx).await;
1080
    /// }
1081
    /// ```
1082
0
    pub fn is_empty(&self) -> bool {
1083
0
        let Some(inner) = self.inner.as_ref() else {
1084
            // The channel has already terminated.
1085
0
            return true;
1086
        };
1087
1088
0
        let state = State::load(&inner.state, Acquire);
1089
0
        if state.is_complete() {
1090
            // SAFETY: If `state.is_complete()` returns true, then the
1091
            // `VALUE_SENT` bit has been set and the sender side of the
1092
            // channel will no longer attempt to access the inner
1093
            // `UnsafeCell`. Therefore, it is now safe for us to access the
1094
            // cell.
1095
            //
1096
            // The channel is empty if it does not have a value.
1097
0
            unsafe { !inner.has_value() }
1098
        } else {
1099
            // The receiver closed the channel or no value has been sent yet.
1100
0
            true
1101
        }
1102
0
    }
1103
1104
    /// Attempts to receive a value.
1105
    ///
1106
    /// If a pending value exists in the channel, it is returned. If no value
1107
    /// has been sent, the current task **will not** be registered for
1108
    /// future notification.
1109
    ///
1110
    /// This function is useful to call from outside the context of an
1111
    /// asynchronous task.
1112
    ///
1113
    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1114
    /// spuriously. Any send or close event that happens before this call to
1115
    /// `try_recv` will be correctly returned to the caller.
1116
    ///
1117
    /// # Return
1118
    ///
1119
    /// - `Ok(T)` if a value is pending in the channel.
1120
    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1121
    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1122
    ///   a value, or if the message has already been received.
1123
    ///
1124
    /// # Examples
1125
    ///
1126
    /// `try_recv` before a value is sent, then after.
1127
    ///
1128
    /// ```
1129
    /// use tokio::sync::oneshot;
1130
    /// use tokio::sync::oneshot::error::TryRecvError;
1131
    ///
1132
    /// # #[tokio::main(flavor = "current_thread")]
1133
    /// # async fn main() {
1134
    /// let (tx, mut rx) = oneshot::channel();
1135
    ///
1136
    /// match rx.try_recv() {
1137
    ///     // The channel is currently empty
1138
    ///     Err(TryRecvError::Empty) => {}
1139
    ///     _ => unreachable!(),
1140
    /// }
1141
    ///
1142
    /// // Send a value
1143
    /// tx.send("hello").unwrap();
1144
    ///
1145
    /// match rx.try_recv() {
1146
    ///      Ok(value) => assert_eq!(value, "hello"),
1147
    ///      _ => unreachable!(),
1148
    /// }
1149
    /// # }
1150
    /// ```
1151
    ///
1152
    /// `try_recv` when the sender dropped before sending a value
1153
    ///
1154
    /// ```
1155
    /// use tokio::sync::oneshot;
1156
    /// use tokio::sync::oneshot::error::TryRecvError;
1157
    ///
1158
    /// # #[tokio::main(flavor = "current_thread")]
1159
    /// # async fn main() {
1160
    /// let (tx, mut rx) = oneshot::channel::<()>();
1161
    ///
1162
    /// drop(tx);
1163
    ///
1164
    /// match rx.try_recv() {
1165
    ///     // The channel will never receive a value.
1166
    ///     Err(TryRecvError::Closed) => {}
1167
    ///     _ => unreachable!(),
1168
    /// }
1169
    /// # }
1170
    /// ```
1171
0
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1172
0
        let result = if let Some(inner) = self.inner.as_ref() {
1173
0
            let state = State::load(&inner.state, Acquire);
1174
1175
0
            if state.is_complete() {
1176
                // SAFETY: If `state.is_complete()` returns true, then the
1177
                // `VALUE_SENT` bit has been set and the sender side of the
1178
                // channel will no longer attempt to access the inner
1179
                // `UnsafeCell`. Therefore, it is now safe for us to access the
1180
                // cell.
1181
0
                match unsafe { inner.consume_value() } {
1182
0
                    Some(value) => {
1183
                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1184
                        self.resource_span.in_scope(|| {
1185
                            tracing::trace!(
1186
                            target: "runtime::resource::state_update",
1187
                            value_received = true,
1188
                            value_received.op = "override",
1189
                            )
1190
                        });
1191
0
                        Ok(value)
1192
                    }
1193
0
                    None => Err(TryRecvError::Closed),
1194
                }
1195
0
            } else if state.is_closed() {
1196
0
                Err(TryRecvError::Closed)
1197
            } else {
1198
                // Not ready, this does not clear `inner`
1199
0
                return Err(TryRecvError::Empty);
1200
            }
1201
        } else {
1202
0
            Err(TryRecvError::Closed)
1203
        };
1204
1205
0
        self.inner = None;
1206
0
        result
1207
0
    }
1208
1209
    /// Blocking receive to call outside of asynchronous contexts.
1210
    ///
1211
    /// # Panics
1212
    ///
1213
    /// This function panics if called within an asynchronous execution
1214
    /// context.
1215
    ///
1216
    /// # Examples
1217
    ///
1218
    /// ```
1219
    /// # #[cfg(not(target_family = "wasm"))]
1220
    /// # {
1221
    /// use std::thread;
1222
    /// use tokio::sync::oneshot;
1223
    ///
1224
    /// #[tokio::main]
1225
    /// async fn main() {
1226
    ///     let (tx, rx) = oneshot::channel::<u8>();
1227
    ///
1228
    ///     let sync_code = thread::spawn(move || {
1229
    ///         assert_eq!(Ok(10), rx.blocking_recv());
1230
    ///     });
1231
    ///
1232
    ///     let _ = tx.send(10);
1233
    ///     sync_code.join().unwrap();
1234
    /// }
1235
    /// # }
1236
    /// ```
1237
    #[track_caller]
1238
    #[cfg(feature = "sync")]
1239
    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1240
0
    pub fn blocking_recv(self) -> Result<T, RecvError> {
1241
0
        crate::future::block_on(self)
1242
0
    }
1243
}
1244
1245
impl<T> Drop for Receiver<T> {
1246
17.4k
    fn drop(&mut self) {
1247
17.4k
        if let Some(inner) = self.inner.as_ref() {
1248
0
            let state = inner.close();
1249
1250
0
            if state.is_complete() {
1251
0
                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1252
0
                // so only the receiver can access the value.
1253
0
                drop(unsafe { inner.consume_value() });
1254
0
            }
1255
1256
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1257
            self.resource_span.in_scope(|| {
1258
                tracing::trace!(
1259
                target: "runtime::resource::state_update",
1260
                rx_dropped = true,
1261
                rx_dropped.op = "override",
1262
                )
1263
            });
1264
17.4k
        }
1265
17.4k
    }
1266
}
1267
1268
impl<T> Future for Receiver<T> {
1269
    type Output = Result<T, RecvError>;
1270
1271
38.1k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1272
        // If `inner` is `None`, then `poll()` has already completed.
1273
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1274
        let _res_span = self.resource_span.clone().entered();
1275
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1276
        let _ao_span = self.async_op_span.clone().entered();
1277
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1278
        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1279
1280
38.1k
        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1281
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1282
            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1283
1284
            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1285
38.1k
            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1286
1287
17.4k
            res
1288
        } else {
1289
0
            panic!("called after complete");
1290
        };
1291
1292
17.4k
        self.inner = None;
1293
17.4k
        Ready(ret)
1294
38.1k
    }
1295
}
1296
1297
impl<T> Inner<T> {
1298
17.4k
    fn complete(&self) -> bool {
1299
17.4k
        let prev = State::set_complete(&self.state);
1300
1301
17.4k
        if prev.is_closed() {
1302
0
            return false;
1303
17.4k
        }
1304
1305
17.4k
        if prev.is_rx_task_set() {
1306
            // TODO: Consume waker?
1307
17.4k
            unsafe {
1308
17.4k
                self.rx_task.with_task(Waker::wake_by_ref);
1309
17.4k
            }
1310
4
        }
1311
1312
17.4k
        true
1313
17.4k
    }
1314
1315
38.1k
    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1316
38.1k
        ready!(crate::trace::trace_leaf(cx));
1317
        // Keep track of task budget
1318
38.1k
        let coop = ready!(crate::task::coop::poll_proceed(cx));
1319
1320
        // Load the state
1321
38.1k
        let mut state = State::load(&self.state, Acquire);
1322
1323
38.1k
        if state.is_complete() {
1324
17.4k
            coop.made_progress();
1325
17.4k
            match unsafe { self.consume_value() } {
1326
0
                Some(value) => Ready(Ok(value)),
1327
17.4k
                None => Ready(Err(RecvError(()))),
1328
            }
1329
20.7k
        } else if state.is_closed() {
1330
0
            coop.made_progress();
1331
0
            Ready(Err(RecvError(())))
1332
        } else {
1333
20.7k
            if state.is_rx_task_set() {
1334
3.29k
                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1335
1336
                // Check if the task is still the same
1337
3.29k
                if !will_notify {
1338
                    // Unset the task
1339
0
                    state = State::unset_rx_task(&self.state);
1340
0
                    if state.is_complete() {
1341
                        // Set the flag again so that the waker is released in drop
1342
0
                        State::set_rx_task(&self.state);
1343
1344
0
                        coop.made_progress();
1345
                        // SAFETY: If `state.is_complete()` returns true, then the
1346
                        // `VALUE_SENT` bit has been set and the sender side of the
1347
                        // channel will no longer attempt to access the inner
1348
                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1349
                        // cell.
1350
0
                        return match unsafe { self.consume_value() } {
1351
0
                            Some(value) => Ready(Ok(value)),
1352
0
                            None => Ready(Err(RecvError(()))),
1353
                        };
1354
0
                    } else {
1355
0
                        unsafe { self.rx_task.drop_task() };
1356
0
                    }
1357
3.29k
                }
1358
17.4k
            }
1359
1360
20.7k
            if !state.is_rx_task_set() {
1361
                // Attempt to set the task
1362
17.4k
                unsafe {
1363
17.4k
                    self.rx_task.set_task(cx);
1364
17.4k
                }
1365
1366
                // Update the state
1367
17.4k
                state = State::set_rx_task(&self.state);
1368
1369
17.4k
                if state.is_complete() {
1370
0
                    coop.made_progress();
1371
0
                    match unsafe { self.consume_value() } {
1372
0
                        Some(value) => Ready(Ok(value)),
1373
0
                        None => Ready(Err(RecvError(()))),
1374
                    }
1375
                } else {
1376
17.4k
                    Pending
1377
                }
1378
            } else {
1379
3.29k
                Pending
1380
            }
1381
        }
1382
38.1k
    }
1383
1384
    /// Called by `Receiver` to indicate that the value will never be received.
1385
0
    fn close(&self) -> State {
1386
0
        let prev = State::set_closed(&self.state);
1387
1388
0
        if prev.is_tx_task_set() && !prev.is_complete() {
1389
0
            unsafe {
1390
0
                self.tx_task.with_task(Waker::wake_by_ref);
1391
0
            }
1392
0
        }
1393
1394
0
        prev
1395
0
    }
1396
1397
    /// Consumes the value. This function does not check `state`.
1398
    ///
1399
    /// # Safety
1400
    ///
1401
    /// Calling this method concurrently on multiple threads will result in a
1402
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1403
    /// sender *or* the receiver will call this method at a given point in time.
1404
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1405
    /// if it is set, then only the receiver may call this method.
1406
17.4k
    unsafe fn consume_value(&self) -> Option<T> {
1407
17.4k
        self.value.with_mut(|ptr| unsafe { (*ptr).take() })
1408
17.4k
    }
1409
1410
    /// Returns true if there is a value. This function does not check `state`.
1411
    ///
1412
    /// # Safety
1413
    ///
1414
    /// Calling this method concurrently on multiple threads will result in a
1415
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1416
    /// sender *or* the receiver will call this method at a given point in time.
1417
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1418
    /// if it is set, then only the receiver may call this method.
1419
0
    unsafe fn has_value(&self) -> bool {
1420
0
        self.value.with(|ptr| unsafe { (*ptr).is_some() })
1421
0
    }
1422
}
1423
1424
unsafe impl<T: Send> Send for Inner<T> {}
1425
unsafe impl<T: Send> Sync for Inner<T> {}
1426
1427
17.4k
fn mut_load(this: &mut AtomicUsize) -> usize {
1428
17.4k
    this.with_mut(|v| *v)
1429
17.4k
}
1430
1431
impl<T> Drop for Inner<T> {
1432
17.4k
    fn drop(&mut self) {
1433
17.4k
        let state = State(mut_load(&mut self.state));
1434
1435
17.4k
        if state.is_rx_task_set() {
1436
17.4k
            unsafe {
1437
17.4k
                self.rx_task.drop_task();
1438
17.4k
            }
1439
4
        }
1440
1441
17.4k
        if state.is_tx_task_set() {
1442
0
            unsafe {
1443
0
                self.tx_task.drop_task();
1444
0
            }
1445
17.4k
        }
1446
1447
        // SAFETY: we have `&mut self`, and therefore we have
1448
        // exclusive access to the value.
1449
        unsafe {
1450
            // Note: the assertion holds because if the value has been sent by sender,
1451
            // we must ensure that the value must have been consumed by the receiver before
1452
            // dropping the `Inner`.
1453
17.4k
            debug_assert!(self.consume_value().is_none());
1454
        }
1455
17.4k
    }
1456
}
1457
1458
impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1459
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1460
        use std::sync::atomic::Ordering::Relaxed;
1461
1462
0
        fmt.debug_struct("Inner")
1463
0
            .field("state", &State::load(&self.state, Relaxed))
1464
0
            .finish()
1465
0
    }
1466
}
1467
1468
/// Indicates that a waker for the receiving task has been set.
1469
///
1470
/// # Safety
1471
///
1472
/// If this bit is not set, the `rx_task` field may be uninitialized.
1473
const RX_TASK_SET: usize = 0b00001;
1474
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1475
///
1476
/// # Safety
1477
///
1478
/// This bit controls which side of the channel is permitted to access the
1479
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1480
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1481
/// the sender.
1482
const VALUE_SENT: usize = 0b00010;
1483
const CLOSED: usize = 0b00100;
1484
1485
/// Indicates that a waker for the sending task has been set.
1486
///
1487
/// # Safety
1488
///
1489
/// If this bit is not set, the `tx_task` field may be uninitialized.
1490
const TX_TASK_SET: usize = 0b01000;
1491
1492
impl State {
1493
17.4k
    fn new() -> State {
1494
17.4k
        State(0)
1495
17.4k
    }
1496
1497
55.5k
    fn is_complete(self) -> bool {
1498
55.5k
        self.0 & VALUE_SENT == VALUE_SENT
1499
55.5k
    }
1500
1501
17.4k
    fn set_complete(cell: &AtomicUsize) -> State {
1502
        // This method is a compare-and-swap loop rather than a fetch-or like
1503
        // other `set_$WHATEVER` methods on `State`. This is because we must
1504
        // check if the state has been closed before setting the `VALUE_SENT`
1505
        // bit.
1506
        //
1507
        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1508
        // bit is already set, because `VALUE_SENT` will tell the receiver that
1509
        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1510
        // `set_complete`, if the channel was closed, the sender will _also_
1511
        // access the `UnsafeCell` to take the value back out, so if a
1512
        // `poll_recv` or `try_recv` call is occurring concurrently, both
1513
        // threads may try to access the `UnsafeCell` if we were to set the
1514
        // `VALUE_SENT` bit on a closed channel.
1515
17.4k
        let mut state = cell.load(Ordering::Relaxed);
1516
        loop {
1517
17.4k
            if State(state).is_closed() {
1518
0
                break;
1519
17.4k
            }
1520
            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1521
            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1522
            // fences yet.
1523
17.4k
            match cell.compare_exchange_weak(
1524
17.4k
                state,
1525
17.4k
                state | VALUE_SENT,
1526
17.4k
                Ordering::AcqRel,
1527
17.4k
                Ordering::Acquire,
1528
17.4k
            ) {
1529
17.4k
                Ok(_) => break,
1530
0
                Err(actual) => state = actual,
1531
            }
1532
        }
1533
17.4k
        State(state)
1534
17.4k
    }
1535
1536
76.3k
    fn is_rx_task_set(self) -> bool {
1537
76.3k
        self.0 & RX_TASK_SET == RX_TASK_SET
1538
76.3k
    }
1539
1540
17.4k
    fn set_rx_task(cell: &AtomicUsize) -> State {
1541
17.4k
        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1542
17.4k
        State(val | RX_TASK_SET)
1543
17.4k
    }
1544
1545
0
    fn unset_rx_task(cell: &AtomicUsize) -> State {
1546
0
        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1547
0
        State(val & !RX_TASK_SET)
1548
0
    }
1549
1550
55.5k
    fn is_closed(self) -> bool {
1551
55.5k
        self.0 & CLOSED == CLOSED
1552
55.5k
    }
1553
1554
0
    fn set_closed(cell: &AtomicUsize) -> State {
1555
        // Acquire because we want all later writes (attempting to poll) to be
1556
        // ordered after this.
1557
0
        let val = cell.fetch_or(CLOSED, Acquire);
1558
0
        State(val)
1559
0
    }
1560
1561
0
    fn set_tx_task(cell: &AtomicUsize) -> State {
1562
0
        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1563
0
        State(val | TX_TASK_SET)
1564
0
    }
1565
1566
0
    fn unset_tx_task(cell: &AtomicUsize) -> State {
1567
0
        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1568
0
        State(val & !TX_TASK_SET)
1569
0
    }
1570
1571
17.4k
    fn is_tx_task_set(self) -> bool {
1572
17.4k
        self.0 & TX_TASK_SET == TX_TASK_SET
1573
17.4k
    }
1574
1575
17.4k
    fn as_usize(self) -> usize {
1576
17.4k
        self.0
1577
17.4k
    }
1578
1579
38.1k
    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1580
38.1k
        let val = cell.load(order);
1581
38.1k
        State(val)
1582
38.1k
    }
1583
}
1584
1585
impl fmt::Debug for State {
1586
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1587
0
        fmt.debug_struct("State")
1588
0
            .field("is_complete", &self.is_complete())
1589
0
            .field("is_closed", &self.is_closed())
1590
0
            .field("is_rx_task_set", &self.is_rx_task_set())
1591
0
            .field("is_tx_task_set", &self.is_tx_task_set())
1592
0
            .finish()
1593
0
    }
1594
}