Coverage Report

Created: 2025-11-24 06:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/sync/oneshot.rs
Line
Count
Source
1
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3
//! A one-shot channel is used for sending a single message between
4
//! asynchronous tasks. The [`channel`] function is used to create a
5
//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6
//!
7
//! The `Sender` handle is used by the producer to send the value.
8
//! The `Receiver` handle is used by the consumer to receive the value.
9
//!
10
//! Each handle can be used on separate tasks.
11
//!
12
//! Since the `send` method is not async, it can be used anywhere. This includes
13
//! sending between two runtimes, and using it from non-async code.
14
//!
15
//! If the [`Receiver`] is closed before receiving a message which has already
16
//! been sent, the message will remain in the channel until the receiver is
17
//! dropped, at which point the message will be dropped immediately.
18
//!
19
//! # Examples
20
//!
21
//! ```
22
//! use tokio::sync::oneshot;
23
//!
24
//! # #[tokio::main(flavor = "current_thread")]
25
//! # async fn main() {
26
//! let (tx, rx) = oneshot::channel();
27
//!
28
//! tokio::spawn(async move {
29
//!     if let Err(_) = tx.send(3) {
30
//!         println!("the receiver dropped");
31
//!     }
32
//! });
33
//!
34
//! match rx.await {
35
//!     Ok(v) => println!("got = {:?}", v),
36
//!     Err(_) => println!("the sender dropped"),
37
//! }
38
//! # }
39
//! ```
40
//!
41
//! If the sender is dropped without sending, the receiver will fail with
42
//! [`error::RecvError`]:
43
//!
44
//! ```
45
//! use tokio::sync::oneshot;
46
//!
47
//! # #[tokio::main(flavor = "current_thread")]
48
//! # async fn main() {
49
//! let (tx, rx) = oneshot::channel::<u32>();
50
//!
51
//! tokio::spawn(async move {
52
//!     drop(tx);
53
//! });
54
//!
55
//! match rx.await {
56
//!     Ok(_) => panic!("This doesn't happen"),
57
//!     Err(_) => println!("the sender dropped"),
58
//! }
59
//! # }
60
//! ```
61
//!
62
//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63
//! the channel.
64
//!
65
//! ```
66
//! use tokio::sync::oneshot;
67
//! use tokio::time::{interval, sleep, Duration};
68
//!
69
//! # #[tokio::main(flavor = "current_thread")]
70
//! # async fn _doc() {}
71
//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72
//! # async fn main() {
73
//! let (send, mut recv) = oneshot::channel();
74
//! let mut interval = interval(Duration::from_millis(100));
75
//!
76
//! # let handle =
77
//! tokio::spawn(async move {
78
//!     sleep(Duration::from_secs(1)).await;
79
//!     send.send("shut down").unwrap();
80
//! });
81
//!
82
//! loop {
83
//!     tokio::select! {
84
//!         _ = interval.tick() => println!("Another 100ms"),
85
//!         msg = &mut recv => {
86
//!             println!("Got message: {}", msg.unwrap());
87
//!             break;
88
//!         }
89
//!     }
90
//! }
91
//! # handle.await.unwrap();
92
//! # }
93
//! ```
94
//!
95
//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96
//! [`Option::take`].
97
//!
98
//! ```
99
//! use tokio::sync::oneshot;
100
//!
101
//! struct SendOnDrop {
102
//!     sender: Option<oneshot::Sender<&'static str>>,
103
//! }
104
//! impl Drop for SendOnDrop {
105
//!     fn drop(&mut self) {
106
//!         if let Some(sender) = self.sender.take() {
107
//!             // Using `let _ =` to ignore send errors.
108
//!             let _ = sender.send("I got dropped!");
109
//!         }
110
//!     }
111
//! }
112
//!
113
//! # #[tokio::main(flavor = "current_thread")]
114
//! # async fn _doc() {}
115
//! # #[tokio::main(flavor = "current_thread")]
116
//! # async fn main() {
117
//! let (send, recv) = oneshot::channel();
118
//!
119
//! let send_on_drop = SendOnDrop { sender: Some(send) };
120
//! drop(send_on_drop);
121
//!
122
//! assert_eq!(recv.await, Ok("I got dropped!"));
123
//! # }
124
//! ```
125
126
use crate::loom::cell::UnsafeCell;
127
use crate::loom::sync::atomic::AtomicUsize;
128
use crate::loom::sync::Arc;
129
#[cfg(all(tokio_unstable, feature = "tracing"))]
130
use crate::util::trace;
131
132
use std::fmt;
133
use std::future::Future;
134
use std::mem::MaybeUninit;
135
use std::pin::Pin;
136
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137
use std::task::Poll::{Pending, Ready};
138
use std::task::{ready, Context, Poll, Waker};
139
140
/// Sends a value to the associated [`Receiver`].
141
///
142
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143
/// [`channel`](fn@channel) function.
144
///
145
/// # Examples
146
///
147
/// ```
148
/// use tokio::sync::oneshot;
149
///
150
/// # #[tokio::main(flavor = "current_thread")]
151
/// # async fn main() {
152
/// let (tx, rx) = oneshot::channel();
153
///
154
/// tokio::spawn(async move {
155
///     if let Err(_) = tx.send(3) {
156
///         println!("the receiver dropped");
157
///     }
158
/// });
159
///
160
/// match rx.await {
161
///     Ok(v) => println!("got = {:?}", v),
162
///     Err(_) => println!("the sender dropped"),
163
/// }
164
/// # }
165
/// ```
166
///
167
/// If the sender is dropped without sending, the receiver will fail with
168
/// [`error::RecvError`]:
169
///
170
/// ```
171
/// use tokio::sync::oneshot;
172
///
173
/// # #[tokio::main(flavor = "current_thread")]
174
/// # async fn main() {
175
/// let (tx, rx) = oneshot::channel::<u32>();
176
///
177
/// tokio::spawn(async move {
178
///     drop(tx);
179
/// });
180
///
181
/// match rx.await {
182
///     Ok(_) => panic!("This doesn't happen"),
183
///     Err(_) => println!("the sender dropped"),
184
/// }
185
/// # }
186
/// ```
187
///
188
/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189
/// [`Option::take`].
190
///
191
/// ```
192
/// use tokio::sync::oneshot;
193
///
194
/// struct SendOnDrop {
195
///     sender: Option<oneshot::Sender<&'static str>>,
196
/// }
197
/// impl Drop for SendOnDrop {
198
///     fn drop(&mut self) {
199
///         if let Some(sender) = self.sender.take() {
200
///             // Using `let _ =` to ignore send errors.
201
///             let _ = sender.send("I got dropped!");
202
///         }
203
///     }
204
/// }
205
///
206
/// # #[tokio::main(flavor = "current_thread")]
207
/// # async fn _doc() {}
208
/// # #[tokio::main(flavor = "current_thread")]
209
/// # async fn main() {
210
/// let (send, recv) = oneshot::channel();
211
///
212
/// let send_on_drop = SendOnDrop { sender: Some(send) };
213
/// drop(send_on_drop);
214
///
215
/// assert_eq!(recv.await, Ok("I got dropped!"));
216
/// # }
217
/// ```
218
///
219
/// [`Option`]: std::option::Option
220
/// [`Option::take`]: std::option::Option::take
221
#[derive(Debug)]
222
pub struct Sender<T> {
223
    inner: Option<Arc<Inner<T>>>,
224
    #[cfg(all(tokio_unstable, feature = "tracing"))]
225
    resource_span: tracing::Span,
226
}
227
228
/// Receives a value from the associated [`Sender`].
229
///
230
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231
/// [`channel`](fn@channel) function.
232
///
233
/// This channel has no `recv` method because the receiver itself implements the
234
/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235
///
236
/// The `poll` method on the `Future` trait is allowed to spuriously return
237
/// `Poll::Pending` even if the message has been sent. If such a spurious
238
/// failure happens, then the caller will be woken when the spurious failure has
239
/// been resolved so that the caller can attempt to receive the message again.
240
/// Note that receiving such a wakeup does not guarantee that the next call will
241
/// succeed — it could fail with another spurious failure. (A spurious failure
242
/// does not mean that the message is lost. It is just delayed.)
243
///
244
/// [`Future`]: trait@std::future::Future
245
///
246
/// # Examples
247
///
248
/// ```
249
/// use tokio::sync::oneshot;
250
///
251
/// # #[tokio::main(flavor = "current_thread")]
252
/// # async fn main() {
253
/// let (tx, rx) = oneshot::channel();
254
///
255
/// tokio::spawn(async move {
256
///     if let Err(_) = tx.send(3) {
257
///         println!("the receiver dropped");
258
///     }
259
/// });
260
///
261
/// match rx.await {
262
///     Ok(v) => println!("got = {:?}", v),
263
///     Err(_) => println!("the sender dropped"),
264
/// }
265
/// # }
266
/// ```
267
///
268
/// If the sender is dropped without sending, the receiver will fail with
269
/// [`error::RecvError`]:
270
///
271
/// ```
272
/// use tokio::sync::oneshot;
273
///
274
/// # #[tokio::main(flavor = "current_thread")]
275
/// # async fn main() {
276
/// let (tx, rx) = oneshot::channel::<u32>();
277
///
278
/// tokio::spawn(async move {
279
///     drop(tx);
280
/// });
281
///
282
/// match rx.await {
283
///     Ok(_) => panic!("This doesn't happen"),
284
///     Err(_) => println!("the sender dropped"),
285
/// }
286
/// # }
287
/// ```
288
///
289
/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290
/// channel.
291
///
292
/// ```
293
/// use tokio::sync::oneshot;
294
/// use tokio::time::{interval, sleep, Duration};
295
///
296
/// # #[tokio::main(flavor = "current_thread")]
297
/// # async fn _doc() {}
298
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299
/// # async fn main() {
300
/// let (send, mut recv) = oneshot::channel();
301
/// let mut interval = interval(Duration::from_millis(100));
302
///
303
/// # let handle =
304
/// tokio::spawn(async move {
305
///     sleep(Duration::from_secs(1)).await;
306
///     send.send("shut down").unwrap();
307
/// });
308
///
309
/// loop {
310
///     tokio::select! {
311
///         _ = interval.tick() => println!("Another 100ms"),
312
///         msg = &mut recv => {
313
///             println!("Got message: {}", msg.unwrap());
314
///             break;
315
///         }
316
///     }
317
/// }
318
/// # handle.await.unwrap();
319
/// # }
320
/// ```
321
#[derive(Debug)]
322
pub struct Receiver<T> {
323
    inner: Option<Arc<Inner<T>>>,
324
    #[cfg(all(tokio_unstable, feature = "tracing"))]
325
    resource_span: tracing::Span,
326
    #[cfg(all(tokio_unstable, feature = "tracing"))]
327
    async_op_span: tracing::Span,
328
    #[cfg(all(tokio_unstable, feature = "tracing"))]
329
    async_op_poll_span: tracing::Span,
330
}
331
332
pub mod error {
333
    //! `Oneshot` error types.
334
335
    use std::fmt;
336
337
    /// Error returned by the `Future` implementation for `Receiver`.
338
    ///
339
    /// This error is returned by the receiver when the sender is dropped without sending.
340
    #[derive(Debug, Eq, PartialEq, Clone)]
341
    pub struct RecvError(pub(super) ());
342
343
    /// Error returned by the `try_recv` function on `Receiver`.
344
    #[derive(Debug, Eq, PartialEq, Clone)]
345
    pub enum TryRecvError {
346
        /// The send half of the channel has not yet sent a value.
347
        Empty,
348
349
        /// The send half of the channel was dropped without sending a value.
350
        Closed,
351
    }
352
353
    // ===== impl RecvError =====
354
355
    impl fmt::Display for RecvError {
356
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357
0
            write!(fmt, "channel closed")
358
0
        }
359
    }
360
361
    impl std::error::Error for RecvError {}
362
363
    // ===== impl TryRecvError =====
364
365
    impl fmt::Display for TryRecvError {
366
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367
0
            match self {
368
0
                TryRecvError::Empty => write!(fmt, "channel empty"),
369
0
                TryRecvError::Closed => write!(fmt, "channel closed"),
370
            }
371
0
        }
372
    }
373
374
    impl std::error::Error for TryRecvError {}
375
}
376
377
use self::error::*;
378
379
struct Inner<T> {
380
    /// Manages the state of the inner cell.
381
    state: AtomicUsize,
382
383
    /// The value. This is set by `Sender` and read by `Receiver`. The state of
384
    /// the cell is tracked by `state`.
385
    value: UnsafeCell<Option<T>>,
386
387
    /// The task to notify when the receiver drops without consuming the value.
388
    ///
389
    /// ## Safety
390
    ///
391
    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392
    /// initialized. If that bit is unset, this field may be uninitialized.
393
    tx_task: Task,
394
395
    /// The task to notify when the value is sent.
396
    ///
397
    /// ## Safety
398
    ///
399
    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400
    /// initialized. If that bit is unset, this field may be uninitialized.
401
    rx_task: Task,
402
}
403
404
struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406
impl Task {
407
3.23k
    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408
3.23k
        self.with_task(|w| w.will_wake(cx.waker()))
409
3.23k
    }
410
411
20.2k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
20.2k
    where
413
20.2k
        F: FnOnce(&Waker) -> R,
414
    {
415
20.2k
        self.0.with(|ptr| {
416
20.2k
            let waker: *const Waker = (*ptr).as_ptr();
417
20.2k
            f(&*waker)
418
20.2k
        })
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>::{closure#0}
Line
Count
Source
415
3.23k
        self.0.with(|ptr| {
416
3.23k
            let waker: *const Waker = (*ptr).as_ptr();
417
3.23k
            f(&*waker)
418
3.23k
        })
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>::{closure#0}
Line
Count
Source
415
17.0k
        self.0.with(|ptr| {
416
17.0k
            let waker: *const Waker = (*ptr).as_ptr();
417
17.0k
            f(&*waker)
418
17.0k
        })
419
20.2k
    }
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>
Line
Count
Source
411
3.23k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
3.23k
    where
413
3.23k
        F: FnOnce(&Waker) -> R,
414
    {
415
3.23k
        self.0.with(|ptr| {
416
            let waker: *const Waker = (*ptr).as_ptr();
417
            f(&*waker)
418
        })
419
3.23k
    }
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>
Line
Count
Source
411
17.0k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
17.0k
    where
413
17.0k
        F: FnOnce(&Waker) -> R,
414
    {
415
17.0k
        self.0.with(|ptr| {
416
            let waker: *const Waker = (*ptr).as_ptr();
417
            f(&*waker)
418
        })
419
17.0k
    }
420
421
17.0k
    unsafe fn drop_task(&self) {
422
17.0k
        self.0.with_mut(|ptr| {
423
17.0k
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
424
17.0k
            ptr.drop_in_place();
425
17.0k
        });
426
17.0k
    }
427
428
17.0k
    unsafe fn set_task(&self, cx: &mut Context<'_>) {
429
17.0k
        self.0.with_mut(|ptr| {
430
17.0k
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
431
17.0k
            ptr.write(cx.waker().clone());
432
17.0k
        });
433
17.0k
    }
434
}
435
436
#[derive(Clone, Copy)]
437
struct State(usize);
438
439
/// Creates a new one-shot channel for sending single values across asynchronous
440
/// tasks.
441
///
442
/// The function returns separate "send" and "receive" handles. The `Sender`
443
/// handle is used by the producer to send the value. The `Receiver` handle is
444
/// used by the consumer to receive the value.
445
///
446
/// Each handle can be used on separate tasks.
447
///
448
/// # Examples
449
///
450
/// ```
451
/// use tokio::sync::oneshot;
452
///
453
/// # #[tokio::main(flavor = "current_thread")]
454
/// # async fn main() {
455
/// let (tx, rx) = oneshot::channel();
456
///
457
/// tokio::spawn(async move {
458
///     if let Err(_) = tx.send(3) {
459
///         println!("the receiver dropped");
460
///     }
461
/// });
462
///
463
/// match rx.await {
464
///     Ok(v) => println!("got = {:?}", v),
465
///     Err(_) => println!("the sender dropped"),
466
/// }
467
/// # }
468
/// ```
469
#[track_caller]
470
17.0k
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471
    #[cfg(all(tokio_unstable, feature = "tracing"))]
472
    let resource_span = {
473
        let location = std::panic::Location::caller();
474
475
        let resource_span = tracing::trace_span!(
476
            parent: None,
477
            "runtime.resource",
478
            concrete_type = "Sender|Receiver",
479
            kind = "Sync",
480
            loc.file = location.file(),
481
            loc.line = location.line(),
482
            loc.col = location.column(),
483
        );
484
485
        resource_span.in_scope(|| {
486
            tracing::trace!(
487
            target: "runtime::resource::state_update",
488
            tx_dropped = false,
489
            tx_dropped.op = "override",
490
            )
491
        });
492
493
        resource_span.in_scope(|| {
494
            tracing::trace!(
495
            target: "runtime::resource::state_update",
496
            rx_dropped = false,
497
            rx_dropped.op = "override",
498
            )
499
        });
500
501
        resource_span.in_scope(|| {
502
            tracing::trace!(
503
            target: "runtime::resource::state_update",
504
            value_sent = false,
505
            value_sent.op = "override",
506
            )
507
        });
508
509
        resource_span.in_scope(|| {
510
            tracing::trace!(
511
            target: "runtime::resource::state_update",
512
            value_received = false,
513
            value_received.op = "override",
514
            )
515
        });
516
517
        resource_span
518
    };
519
520
17.0k
    let inner = Arc::new(Inner {
521
17.0k
        state: AtomicUsize::new(State::new().as_usize()),
522
17.0k
        value: UnsafeCell::new(None),
523
17.0k
        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524
17.0k
        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
525
17.0k
    });
526
527
17.0k
    let tx = Sender {
528
17.0k
        inner: Some(inner.clone()),
529
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
530
17.0k
        resource_span: resource_span.clone(),
531
17.0k
    };
532
533
    #[cfg(all(tokio_unstable, feature = "tracing"))]
534
    let async_op_span = resource_span
535
        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
536
537
    #[cfg(all(tokio_unstable, feature = "tracing"))]
538
    let async_op_poll_span =
539
        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
540
541
17.0k
    let rx = Receiver {
542
17.0k
        inner: Some(inner),
543
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
544
17.0k
        resource_span,
545
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
546
17.0k
        async_op_span,
547
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
548
17.0k
        async_op_poll_span,
549
17.0k
    };
550
551
17.0k
    (tx, rx)
552
17.0k
}
553
554
impl<T> Sender<T> {
555
    /// Attempts to send a value on this channel, returning it back if it could
556
    /// not be sent.
557
    ///
558
    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
559
    /// channel. It is not marked async because sending a message to an `oneshot`
560
    /// channel never requires any form of waiting.  Because of this, the `send`
561
    /// method can be used in both synchronous and asynchronous code without
562
    /// problems.
563
    ///
564
    /// A successful send occurs when it is determined that the other end of the
565
    /// channel has not hung up already. An unsuccessful send would be one where
566
    /// the corresponding receiver has already been deallocated. Note that a
567
    /// return value of `Err` means that the data will never be received, but
568
    /// a return value of `Ok` does *not* mean that the data will be received.
569
    /// It is possible for the corresponding receiver to hang up immediately
570
    /// after this function returns `Ok`.
571
    ///
572
    /// # Examples
573
    ///
574
    /// Send a value to another task
575
    ///
576
    /// ```
577
    /// use tokio::sync::oneshot;
578
    ///
579
    /// # #[tokio::main(flavor = "current_thread")]
580
    /// # async fn main() {
581
    /// let (tx, rx) = oneshot::channel();
582
    ///
583
    /// tokio::spawn(async move {
584
    ///     if let Err(_) = tx.send(3) {
585
    ///         println!("the receiver dropped");
586
    ///     }
587
    /// });
588
    ///
589
    /// match rx.await {
590
    ///     Ok(v) => println!("got = {:?}", v),
591
    ///     Err(_) => println!("the sender dropped"),
592
    /// }
593
    /// # }
594
    /// ```
595
0
    pub fn send(mut self, t: T) -> Result<(), T> {
596
0
        let inner = self.inner.take().unwrap();
597
598
0
        inner.value.with_mut(|ptr| unsafe {
599
            // SAFETY: The receiver will not access the `UnsafeCell` unless the
600
            // channel has been marked as "complete" (the `VALUE_SENT` state bit
601
            // is set).
602
            // That bit is only set by the sender later on in this method, and
603
            // calling this method consumes `self`. Therefore, if it was possible to
604
            // call this method, we know that the `VALUE_SENT` bit is unset, and
605
            // the receiver is not currently accessing the `UnsafeCell`.
606
0
            *ptr = Some(t);
607
0
        });
608
609
0
        if !inner.complete() {
610
            unsafe {
611
                // SAFETY: The receiver will not access the `UnsafeCell` unless
612
                // the channel has been marked as "complete". Calling
613
                // `complete()` will return true if this bit is set, and false
614
                // if it is not set. Thus, if `complete()` returned false, it is
615
                // safe for us to access the value, because we know that the
616
                // receiver will not.
617
0
                return Err(inner.consume_value().unwrap());
618
            }
619
0
        }
620
621
        #[cfg(all(tokio_unstable, feature = "tracing"))]
622
        self.resource_span.in_scope(|| {
623
            tracing::trace!(
624
            target: "runtime::resource::state_update",
625
            value_sent = true,
626
            value_sent.op = "override",
627
            )
628
        });
629
630
0
        Ok(())
631
0
    }
632
633
    /// Waits for the associated [`Receiver`] handle to close.
634
    ///
635
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
636
    /// [`Receiver`] value is dropped.
637
    ///
638
    /// This function is useful when paired with `select!` to abort a
639
    /// computation when the receiver is no longer interested in the result.
640
    ///
641
    /// # Return
642
    ///
643
    /// Returns a `Future` which must be awaited on.
644
    ///
645
    /// [`Receiver`]: Receiver
646
    /// [`close`]: Receiver::close
647
    ///
648
    /// # Examples
649
    ///
650
    /// Basic usage
651
    ///
652
    /// ```
653
    /// use tokio::sync::oneshot;
654
    ///
655
    /// # #[tokio::main(flavor = "current_thread")]
656
    /// # async fn main() {
657
    /// let (mut tx, rx) = oneshot::channel::<()>();
658
    ///
659
    /// tokio::spawn(async move {
660
    ///     drop(rx);
661
    /// });
662
    ///
663
    /// tx.closed().await;
664
    /// println!("the receiver dropped");
665
    /// # }
666
    /// ```
667
    ///
668
    /// Paired with select
669
    ///
670
    /// ```
671
    /// use tokio::sync::oneshot;
672
    /// use tokio::time::{self, Duration};
673
    ///
674
    /// async fn compute() -> String {
675
    ///     // Complex computation returning a `String`
676
    /// # "hello".to_string()
677
    /// }
678
    ///
679
    /// # #[tokio::main(flavor = "current_thread")]
680
    /// # async fn main() {
681
    /// let (mut tx, rx) = oneshot::channel();
682
    ///
683
    /// tokio::spawn(async move {
684
    ///     tokio::select! {
685
    ///         _ = tx.closed() => {
686
    ///             // The receiver dropped, no need to do any further work
687
    ///         }
688
    ///         value = compute() => {
689
    ///             // The send can fail if the channel was closed at the exact same
690
    ///             // time as when compute() finished, so just ignore the failure.
691
    ///             let _ = tx.send(value);
692
    ///         }
693
    ///     }
694
    /// });
695
    ///
696
    /// // Wait for up to 10 seconds
697
    /// let _ = time::timeout(Duration::from_secs(10), rx).await;
698
    /// # }
699
    /// ```
700
0
    pub async fn closed(&mut self) {
701
        use std::future::poll_fn;
702
703
        #[cfg(all(tokio_unstable, feature = "tracing"))]
704
        let resource_span = self.resource_span.clone();
705
        #[cfg(all(tokio_unstable, feature = "tracing"))]
706
        let closed = trace::async_op(
707
            || poll_fn(|cx| self.poll_closed(cx)),
708
            resource_span,
709
            "Sender::closed",
710
            "poll_closed",
711
            false,
712
        );
713
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
714
0
        let closed = poll_fn(|cx| self.poll_closed(cx));
715
716
0
        closed.await;
717
0
    }
718
719
    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
720
    ///
721
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
722
    /// [`Receiver`] value is dropped.
723
    ///
724
    /// If `true` is returned, a call to `send` will always result in an error.
725
    ///
726
    /// [`Receiver`]: Receiver
727
    /// [`close`]: Receiver::close
728
    ///
729
    /// # Examples
730
    ///
731
    /// ```
732
    /// use tokio::sync::oneshot;
733
    ///
734
    /// # #[tokio::main(flavor = "current_thread")]
735
    /// # async fn main() {
736
    /// let (tx, rx) = oneshot::channel();
737
    ///
738
    /// assert!(!tx.is_closed());
739
    ///
740
    /// drop(rx);
741
    ///
742
    /// assert!(tx.is_closed());
743
    /// assert!(tx.send("never received").is_err());
744
    /// # }
745
    /// ```
746
0
    pub fn is_closed(&self) -> bool {
747
0
        let inner = self.inner.as_ref().unwrap();
748
749
0
        let state = State::load(&inner.state, Acquire);
750
0
        state.is_closed()
751
0
    }
752
753
    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
754
    /// `Waker` in the provided `Context` to receive a notification when the channel is
755
    /// closed.
756
    ///
757
    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
758
    /// [`Receiver`] value is dropped.
759
    ///
760
    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
761
    /// to the most recent call will be scheduled to receive a wakeup.
762
    ///
763
    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
764
    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
765
    ///
766
    /// # Return value
767
    ///
768
    /// This function returns:
769
    ///
770
    ///  * `Poll::Pending` if the channel is still open.
771
    ///  * `Poll::Ready(())` if the channel is closed.
772
    ///
773
    /// # Examples
774
    ///
775
    /// ```
776
    /// use tokio::sync::oneshot;
777
    ///
778
    /// use std::future::poll_fn;
779
    ///
780
    /// # #[tokio::main(flavor = "current_thread")]
781
    /// # async fn main() {
782
    /// let (mut tx, mut rx) = oneshot::channel::<()>();
783
    ///
784
    /// tokio::spawn(async move {
785
    ///     rx.close();
786
    /// });
787
    ///
788
    /// poll_fn(|cx| tx.poll_closed(cx)).await;
789
    ///
790
    /// println!("the receiver dropped");
791
    /// # }
792
    /// ```
793
0
    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
794
0
        ready!(crate::trace::trace_leaf(cx));
795
796
        // Keep track of task budget
797
0
        let coop = ready!(crate::task::coop::poll_proceed(cx));
798
799
0
        let inner = self.inner.as_ref().unwrap();
800
801
0
        let mut state = State::load(&inner.state, Acquire);
802
803
0
        if state.is_closed() {
804
0
            coop.made_progress();
805
0
            return Ready(());
806
0
        }
807
808
0
        if state.is_tx_task_set() {
809
0
            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
810
811
0
            if !will_notify {
812
0
                state = State::unset_tx_task(&inner.state);
813
814
0
                if state.is_closed() {
815
                    // Set the flag again so that the waker is released in drop
816
0
                    State::set_tx_task(&inner.state);
817
0
                    coop.made_progress();
818
0
                    return Ready(());
819
0
                } else {
820
0
                    unsafe { inner.tx_task.drop_task() };
821
0
                }
822
0
            }
823
0
        }
824
825
0
        if !state.is_tx_task_set() {
826
            // Attempt to set the task
827
0
            unsafe {
828
0
                inner.tx_task.set_task(cx);
829
0
            }
830
831
            // Update the state
832
0
            state = State::set_tx_task(&inner.state);
833
834
0
            if state.is_closed() {
835
0
                coop.made_progress();
836
0
                return Ready(());
837
0
            }
838
0
        }
839
840
0
        Pending
841
0
    }
842
}
843
844
impl<T> Drop for Sender<T> {
845
17.0k
    fn drop(&mut self) {
846
17.0k
        if let Some(inner) = self.inner.as_ref() {
847
17.0k
            inner.complete();
848
17.0k
            #[cfg(all(tokio_unstable, feature = "tracing"))]
849
17.0k
            self.resource_span.in_scope(|| {
850
17.0k
                tracing::trace!(
851
17.0k
                target: "runtime::resource::state_update",
852
17.0k
                tx_dropped = true,
853
17.0k
                tx_dropped.op = "override",
854
17.0k
                )
855
17.0k
            });
856
17.0k
        }
857
17.0k
    }
858
}
859
860
impl<T> Receiver<T> {
861
    /// Prevents the associated [`Sender`] handle from sending a value.
862
    ///
863
    /// Any `send` operation which happens after calling `close` is guaranteed
864
    /// to fail. After calling `close`, [`try_recv`] should be called to
865
    /// receive a value if one was sent **before** the call to `close`
866
    /// completed.
867
    ///
868
    /// This function is useful to perform a graceful shutdown and ensure that a
869
    /// value will not be sent into the channel and never received.
870
    ///
871
    /// `close` is no-op if a message is already received or the channel
872
    /// is already closed.
873
    ///
874
    /// [`Sender`]: Sender
875
    /// [`try_recv`]: Receiver::try_recv
876
    ///
877
    /// # Examples
878
    ///
879
    /// Prevent a value from being sent
880
    ///
881
    /// ```
882
    /// use tokio::sync::oneshot;
883
    /// use tokio::sync::oneshot::error::TryRecvError;
884
    ///
885
    /// # #[tokio::main(flavor = "current_thread")]
886
    /// # async fn main() {
887
    /// let (tx, mut rx) = oneshot::channel();
888
    ///
889
    /// assert!(!tx.is_closed());
890
    ///
891
    /// rx.close();
892
    ///
893
    /// assert!(tx.is_closed());
894
    /// assert!(tx.send("never received").is_err());
895
    ///
896
    /// match rx.try_recv() {
897
    ///     Err(TryRecvError::Closed) => {}
898
    ///     _ => unreachable!(),
899
    /// }
900
    /// # }
901
    /// ```
902
    ///
903
    /// Receive a value sent **before** calling `close`
904
    ///
905
    /// ```
906
    /// use tokio::sync::oneshot;
907
    ///
908
    /// # #[tokio::main(flavor = "current_thread")]
909
    /// # async fn main() {
910
    /// let (tx, mut rx) = oneshot::channel();
911
    ///
912
    /// assert!(tx.send("will receive").is_ok());
913
    ///
914
    /// rx.close();
915
    ///
916
    /// let msg = rx.try_recv().unwrap();
917
    /// assert_eq!(msg, "will receive");
918
    /// # }
919
    /// ```
920
0
    pub fn close(&mut self) {
921
0
        if let Some(inner) = self.inner.as_ref() {
922
0
            inner.close();
923
0
            #[cfg(all(tokio_unstable, feature = "tracing"))]
924
0
            self.resource_span.in_scope(|| {
925
0
                tracing::trace!(
926
0
                target: "runtime::resource::state_update",
927
0
                rx_dropped = true,
928
0
                rx_dropped.op = "override",
929
0
                )
930
0
            });
931
0
        }
932
0
    }
933
934
    /// Checks if this receiver is terminated.
935
    ///
936
    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
937
    /// If so, this receiver should no longer be polled.
938
    ///
939
    /// # Examples
940
    ///
941
    /// Sending a value and polling it.
942
    ///
943
    /// ```
944
    /// use tokio::sync::oneshot;
945
    ///
946
    /// use std::task::Poll;
947
    ///
948
    /// # #[tokio::main(flavor = "current_thread")]
949
    /// # async fn main() {
950
    /// let (tx, mut rx) = oneshot::channel();
951
    ///
952
    /// // A receiver is not terminated when it is initialized.
953
    /// assert!(!rx.is_terminated());
954
    ///
955
    /// // A receiver is not terminated it is polled and is still pending.
956
    /// let poll = futures::poll!(&mut rx);
957
    /// assert_eq!(poll, Poll::Pending);
958
    /// assert!(!rx.is_terminated());
959
    ///
960
    /// // A receiver is not terminated if a value has been sent, but not yet read.
961
    /// tx.send(0).unwrap();
962
    /// assert!(!rx.is_terminated());
963
    ///
964
    /// // A receiver *is* terminated after it has been polled and yielded a value.
965
    /// assert_eq!((&mut rx).await, Ok(0));
966
    /// assert!(rx.is_terminated());
967
    /// # }
968
    /// ```
969
    ///
970
    /// Dropping the sender.
971
    ///
972
    /// ```
973
    /// use tokio::sync::oneshot;
974
    ///
975
    /// # #[tokio::main(flavor = "current_thread")]
976
    /// # async fn main() {
977
    /// let (tx, mut rx) = oneshot::channel::<()>();
978
    ///
979
    /// // A receiver is not immediately terminated when the sender is dropped.
980
    /// drop(tx);
981
    /// assert!(!rx.is_terminated());
982
    ///
983
    /// // A receiver *is* terminated after it has been polled and yielded an error.
984
    /// let _ = (&mut rx).await.unwrap_err();
985
    /// assert!(rx.is_terminated());
986
    /// # }
987
    /// ```
988
0
    pub fn is_terminated(&self) -> bool {
989
0
        self.inner.is_none()
990
0
    }
991
992
    /// Checks if a channel is empty.
993
    ///
994
    /// This method returns `true` if the channel has no messages.
995
    ///
996
    /// It is not necessarily safe to poll an empty receiver, which may have
997
    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
998
    /// to check whether or not a receiver can be safely polled, instead.
999
    ///
1000
    /// # Examples
1001
    ///
1002
    /// Sending a value.
1003
    ///
1004
    /// ```
1005
    /// use tokio::sync::oneshot;
1006
    ///
1007
    /// # #[tokio::main(flavor = "current_thread")]
1008
    /// # async fn main() {
1009
    /// let (tx, mut rx) = oneshot::channel();
1010
    /// assert!(rx.is_empty());
1011
    ///
1012
    /// tx.send(0).unwrap();
1013
    /// assert!(!rx.is_empty());
1014
    ///
1015
    /// let _ = (&mut rx).await;
1016
    /// assert!(rx.is_empty());
1017
    /// # }
1018
    /// ```
1019
    ///
1020
    /// Dropping the sender.
1021
    ///
1022
    /// ```
1023
    /// use tokio::sync::oneshot;
1024
    ///
1025
    /// # #[tokio::main(flavor = "current_thread")]
1026
    /// # async fn main() {
1027
    /// let (tx, mut rx) = oneshot::channel::<()>();
1028
    ///
1029
    /// // A channel is empty if the sender is dropped.
1030
    /// drop(tx);
1031
    /// assert!(rx.is_empty());
1032
    ///
1033
    /// // A closed channel still yields an error, however.
1034
    /// (&mut rx).await.expect_err("should yield an error");
1035
    /// assert!(rx.is_empty());
1036
    /// # }
1037
    /// ```
1038
    ///
1039
    /// Terminated channels are empty.
1040
    ///
1041
    /// ```should_panic,ignore-wasm
1042
    /// use tokio::sync::oneshot;
1043
    ///
1044
    /// #[tokio::main]
1045
    /// async fn main() {
1046
    ///     let (tx, mut rx) = oneshot::channel();
1047
    ///     tx.send(0).unwrap();
1048
    ///     let _ = (&mut rx).await;
1049
    ///
1050
    ///     // NB: an empty channel is not necessarily safe to poll!
1051
    ///     assert!(rx.is_empty());
1052
    ///     let _ = (&mut rx).await;
1053
    /// }
1054
    /// ```
1055
0
    pub fn is_empty(&self) -> bool {
1056
0
        let Some(inner) = self.inner.as_ref() else {
1057
            // The channel has already terminated.
1058
0
            return true;
1059
        };
1060
1061
0
        let state = State::load(&inner.state, Acquire);
1062
0
        if state.is_complete() {
1063
            // SAFETY: If `state.is_complete()` returns true, then the
1064
            // `VALUE_SENT` bit has been set and the sender side of the
1065
            // channel will no longer attempt to access the inner
1066
            // `UnsafeCell`. Therefore, it is now safe for us to access the
1067
            // cell.
1068
            //
1069
            // The channel is empty if it does not have a value.
1070
0
            unsafe { !inner.has_value() }
1071
        } else {
1072
            // The receiver closed the channel or no value has been sent yet.
1073
0
            true
1074
        }
1075
0
    }
1076
1077
    /// Attempts to receive a value.
1078
    ///
1079
    /// If a pending value exists in the channel, it is returned. If no value
1080
    /// has been sent, the current task **will not** be registered for
1081
    /// future notification.
1082
    ///
1083
    /// This function is useful to call from outside the context of an
1084
    /// asynchronous task.
1085
    ///
1086
    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1087
    /// spuriously. Any send or close event that happens before this call to
1088
    /// `try_recv` will be correctly returned to the caller.
1089
    ///
1090
    /// # Return
1091
    ///
1092
    /// - `Ok(T)` if a value is pending in the channel.
1093
    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1094
    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1095
    ///   a value, or if the message has already been received.
1096
    ///
1097
    /// # Examples
1098
    ///
1099
    /// `try_recv` before a value is sent, then after.
1100
    ///
1101
    /// ```
1102
    /// use tokio::sync::oneshot;
1103
    /// use tokio::sync::oneshot::error::TryRecvError;
1104
    ///
1105
    /// # #[tokio::main(flavor = "current_thread")]
1106
    /// # async fn main() {
1107
    /// let (tx, mut rx) = oneshot::channel();
1108
    ///
1109
    /// match rx.try_recv() {
1110
    ///     // The channel is currently empty
1111
    ///     Err(TryRecvError::Empty) => {}
1112
    ///     _ => unreachable!(),
1113
    /// }
1114
    ///
1115
    /// // Send a value
1116
    /// tx.send("hello").unwrap();
1117
    ///
1118
    /// match rx.try_recv() {
1119
    ///      Ok(value) => assert_eq!(value, "hello"),
1120
    ///      _ => unreachable!(),
1121
    /// }
1122
    /// # }
1123
    /// ```
1124
    ///
1125
    /// `try_recv` when the sender dropped before sending a value
1126
    ///
1127
    /// ```
1128
    /// use tokio::sync::oneshot;
1129
    /// use tokio::sync::oneshot::error::TryRecvError;
1130
    ///
1131
    /// # #[tokio::main(flavor = "current_thread")]
1132
    /// # async fn main() {
1133
    /// let (tx, mut rx) = oneshot::channel::<()>();
1134
    ///
1135
    /// drop(tx);
1136
    ///
1137
    /// match rx.try_recv() {
1138
    ///     // The channel will never receive a value.
1139
    ///     Err(TryRecvError::Closed) => {}
1140
    ///     _ => unreachable!(),
1141
    /// }
1142
    /// # }
1143
    /// ```
1144
0
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1145
0
        let result = if let Some(inner) = self.inner.as_ref() {
1146
0
            let state = State::load(&inner.state, Acquire);
1147
1148
0
            if state.is_complete() {
1149
                // SAFETY: If `state.is_complete()` returns true, then the
1150
                // `VALUE_SENT` bit has been set and the sender side of the
1151
                // channel will no longer attempt to access the inner
1152
                // `UnsafeCell`. Therefore, it is now safe for us to access the
1153
                // cell.
1154
0
                match unsafe { inner.consume_value() } {
1155
0
                    Some(value) => {
1156
                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1157
                        self.resource_span.in_scope(|| {
1158
                            tracing::trace!(
1159
                            target: "runtime::resource::state_update",
1160
                            value_received = true,
1161
                            value_received.op = "override",
1162
                            )
1163
                        });
1164
0
                        Ok(value)
1165
                    }
1166
0
                    None => Err(TryRecvError::Closed),
1167
                }
1168
0
            } else if state.is_closed() {
1169
0
                Err(TryRecvError::Closed)
1170
            } else {
1171
                // Not ready, this does not clear `inner`
1172
0
                return Err(TryRecvError::Empty);
1173
            }
1174
        } else {
1175
0
            Err(TryRecvError::Closed)
1176
        };
1177
1178
0
        self.inner = None;
1179
0
        result
1180
0
    }
1181
1182
    /// Blocking receive to call outside of asynchronous contexts.
1183
    ///
1184
    /// # Panics
1185
    ///
1186
    /// This function panics if called within an asynchronous execution
1187
    /// context.
1188
    ///
1189
    /// # Examples
1190
    ///
1191
    /// ```
1192
    /// # #[cfg(not(target_family = "wasm"))]
1193
    /// # {
1194
    /// use std::thread;
1195
    /// use tokio::sync::oneshot;
1196
    ///
1197
    /// #[tokio::main]
1198
    /// async fn main() {
1199
    ///     let (tx, rx) = oneshot::channel::<u8>();
1200
    ///
1201
    ///     let sync_code = thread::spawn(move || {
1202
    ///         assert_eq!(Ok(10), rx.blocking_recv());
1203
    ///     });
1204
    ///
1205
    ///     let _ = tx.send(10);
1206
    ///     sync_code.join().unwrap();
1207
    /// }
1208
    /// # }
1209
    /// ```
1210
    #[track_caller]
1211
    #[cfg(feature = "sync")]
1212
    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1213
0
    pub fn blocking_recv(self) -> Result<T, RecvError> {
1214
0
        crate::future::block_on(self)
1215
0
    }
1216
}
1217
1218
impl<T> Drop for Receiver<T> {
1219
17.0k
    fn drop(&mut self) {
1220
17.0k
        if let Some(inner) = self.inner.as_ref() {
1221
0
            let state = inner.close();
1222
1223
0
            if state.is_complete() {
1224
0
                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1225
0
                // so only the receiver can access the value.
1226
0
                drop(unsafe { inner.consume_value() });
1227
0
            }
1228
1229
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1230
            self.resource_span.in_scope(|| {
1231
                tracing::trace!(
1232
                target: "runtime::resource::state_update",
1233
                rx_dropped = true,
1234
                rx_dropped.op = "override",
1235
                )
1236
            });
1237
17.0k
        }
1238
17.0k
    }
1239
}
1240
1241
impl<T> Future for Receiver<T> {
1242
    type Output = Result<T, RecvError>;
1243
1244
37.2k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1245
        // If `inner` is `None`, then `poll()` has already completed.
1246
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1247
        let _res_span = self.resource_span.clone().entered();
1248
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1249
        let _ao_span = self.async_op_span.clone().entered();
1250
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1251
        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1252
1253
37.2k
        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1254
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1255
            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1256
1257
            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1258
37.2k
            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1259
1260
17.0k
            res
1261
        } else {
1262
0
            panic!("called after complete");
1263
        };
1264
1265
17.0k
        self.inner = None;
1266
17.0k
        Ready(ret)
1267
37.2k
    }
1268
}
1269
1270
impl<T> Inner<T> {
1271
17.0k
    fn complete(&self) -> bool {
1272
17.0k
        let prev = State::set_complete(&self.state);
1273
1274
17.0k
        if prev.is_closed() {
1275
0
            return false;
1276
17.0k
        }
1277
1278
17.0k
        if prev.is_rx_task_set() {
1279
            // TODO: Consume waker?
1280
17.0k
            unsafe {
1281
17.0k
                self.rx_task.with_task(Waker::wake_by_ref);
1282
17.0k
            }
1283
7
        }
1284
1285
17.0k
        true
1286
17.0k
    }
1287
1288
37.2k
    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1289
37.2k
        ready!(crate::trace::trace_leaf(cx));
1290
        // Keep track of task budget
1291
37.2k
        let coop = ready!(crate::task::coop::poll_proceed(cx));
1292
1293
        // Load the state
1294
37.2k
        let mut state = State::load(&self.state, Acquire);
1295
1296
37.2k
        if state.is_complete() {
1297
17.0k
            coop.made_progress();
1298
17.0k
            match unsafe { self.consume_value() } {
1299
0
                Some(value) => Ready(Ok(value)),
1300
17.0k
                None => Ready(Err(RecvError(()))),
1301
            }
1302
20.2k
        } else if state.is_closed() {
1303
0
            coop.made_progress();
1304
0
            Ready(Err(RecvError(())))
1305
        } else {
1306
20.2k
            if state.is_rx_task_set() {
1307
3.23k
                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1308
1309
                // Check if the task is still the same
1310
3.23k
                if !will_notify {
1311
                    // Unset the task
1312
0
                    state = State::unset_rx_task(&self.state);
1313
0
                    if state.is_complete() {
1314
                        // Set the flag again so that the waker is released in drop
1315
0
                        State::set_rx_task(&self.state);
1316
1317
0
                        coop.made_progress();
1318
                        // SAFETY: If `state.is_complete()` returns true, then the
1319
                        // `VALUE_SENT` bit has been set and the sender side of the
1320
                        // channel will no longer attempt to access the inner
1321
                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1322
                        // cell.
1323
0
                        return match unsafe { self.consume_value() } {
1324
0
                            Some(value) => Ready(Ok(value)),
1325
0
                            None => Ready(Err(RecvError(()))),
1326
                        };
1327
0
                    } else {
1328
0
                        unsafe { self.rx_task.drop_task() };
1329
0
                    }
1330
3.23k
                }
1331
17.0k
            }
1332
1333
20.2k
            if !state.is_rx_task_set() {
1334
                // Attempt to set the task
1335
17.0k
                unsafe {
1336
17.0k
                    self.rx_task.set_task(cx);
1337
17.0k
                }
1338
1339
                // Update the state
1340
17.0k
                state = State::set_rx_task(&self.state);
1341
1342
17.0k
                if state.is_complete() {
1343
0
                    coop.made_progress();
1344
0
                    match unsafe { self.consume_value() } {
1345
0
                        Some(value) => Ready(Ok(value)),
1346
0
                        None => Ready(Err(RecvError(()))),
1347
                    }
1348
                } else {
1349
17.0k
                    Pending
1350
                }
1351
            } else {
1352
3.23k
                Pending
1353
            }
1354
        }
1355
37.2k
    }
1356
1357
    /// Called by `Receiver` to indicate that the value will never be received.
1358
0
    fn close(&self) -> State {
1359
0
        let prev = State::set_closed(&self.state);
1360
1361
0
        if prev.is_tx_task_set() && !prev.is_complete() {
1362
0
            unsafe {
1363
0
                self.tx_task.with_task(Waker::wake_by_ref);
1364
0
            }
1365
0
        }
1366
1367
0
        prev
1368
0
    }
1369
1370
    /// Consumes the value. This function does not check `state`.
1371
    ///
1372
    /// # Safety
1373
    ///
1374
    /// Calling this method concurrently on multiple threads will result in a
1375
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1376
    /// sender *or* the receiver will call this method at a given point in time.
1377
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1378
    /// if it is set, then only the receiver may call this method.
1379
17.0k
    unsafe fn consume_value(&self) -> Option<T> {
1380
17.0k
        self.value.with_mut(|ptr| (*ptr).take())
1381
17.0k
    }
1382
1383
    /// Returns true if there is a value. This function does not check `state`.
1384
    ///
1385
    /// # Safety
1386
    ///
1387
    /// Calling this method concurrently on multiple threads will result in a
1388
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1389
    /// sender *or* the receiver will call this method at a given point in time.
1390
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1391
    /// if it is set, then only the receiver may call this method.
1392
0
    unsafe fn has_value(&self) -> bool {
1393
0
        self.value.with(|ptr| (*ptr).is_some())
1394
0
    }
1395
}
1396
1397
unsafe impl<T: Send> Send for Inner<T> {}
1398
unsafe impl<T: Send> Sync for Inner<T> {}
1399
1400
17.0k
fn mut_load(this: &mut AtomicUsize) -> usize {
1401
17.0k
    this.with_mut(|v| *v)
1402
17.0k
}
1403
1404
impl<T> Drop for Inner<T> {
1405
17.0k
    fn drop(&mut self) {
1406
17.0k
        let state = State(mut_load(&mut self.state));
1407
1408
17.0k
        if state.is_rx_task_set() {
1409
17.0k
            unsafe {
1410
17.0k
                self.rx_task.drop_task();
1411
17.0k
            }
1412
7
        }
1413
1414
17.0k
        if state.is_tx_task_set() {
1415
0
            unsafe {
1416
0
                self.tx_task.drop_task();
1417
0
            }
1418
17.0k
        }
1419
1420
        // SAFETY: we have `&mut self`, and therefore we have
1421
        // exclusive access to the value.
1422
        unsafe {
1423
            // Note: the assertion holds because if the value has been sent by sender,
1424
            // we must ensure that the value must have been consumed by the receiver before
1425
            // dropping the `Inner`.
1426
17.0k
            debug_assert!(self.consume_value().is_none());
1427
        }
1428
17.0k
    }
1429
}
1430
1431
impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1432
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1433
        use std::sync::atomic::Ordering::Relaxed;
1434
1435
0
        fmt.debug_struct("Inner")
1436
0
            .field("state", &State::load(&self.state, Relaxed))
1437
0
            .finish()
1438
0
    }
1439
}
1440
1441
/// Indicates that a waker for the receiving task has been set.
1442
///
1443
/// # Safety
1444
///
1445
/// If this bit is not set, the `rx_task` field may be uninitialized.
1446
const RX_TASK_SET: usize = 0b00001;
1447
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1448
///
1449
/// # Safety
1450
///
1451
/// This bit controls which side of the channel is permitted to access the
1452
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1453
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1454
/// the sender.
1455
const VALUE_SENT: usize = 0b00010;
1456
const CLOSED: usize = 0b00100;
1457
1458
/// Indicates that a waker for the sending task has been set.
1459
///
1460
/// # Safety
1461
///
1462
/// If this bit is not set, the `tx_task` field may be uninitialized.
1463
const TX_TASK_SET: usize = 0b01000;
1464
1465
impl State {
1466
17.0k
    fn new() -> State {
1467
17.0k
        State(0)
1468
17.0k
    }
1469
1470
54.3k
    fn is_complete(self) -> bool {
1471
54.3k
        self.0 & VALUE_SENT == VALUE_SENT
1472
54.3k
    }
1473
1474
17.0k
    fn set_complete(cell: &AtomicUsize) -> State {
1475
        // This method is a compare-and-swap loop rather than a fetch-or like
1476
        // other `set_$WHATEVER` methods on `State`. This is because we must
1477
        // check if the state has been closed before setting the `VALUE_SENT`
1478
        // bit.
1479
        //
1480
        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1481
        // bit is already set, because `VALUE_SENT` will tell the receiver that
1482
        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1483
        // `set_complete`, if the channel was closed, the sender will _also_
1484
        // access the `UnsafeCell` to take the value back out, so if a
1485
        // `poll_recv` or `try_recv` call is occurring concurrently, both
1486
        // threads may try to access the `UnsafeCell` if we were to set the
1487
        // `VALUE_SENT` bit on a closed channel.
1488
17.0k
        let mut state = cell.load(Ordering::Relaxed);
1489
        loop {
1490
17.0k
            if State(state).is_closed() {
1491
0
                break;
1492
17.0k
            }
1493
            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1494
            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1495
            // fences yet.
1496
17.0k
            match cell.compare_exchange_weak(
1497
17.0k
                state,
1498
17.0k
                state | VALUE_SENT,
1499
17.0k
                Ordering::AcqRel,
1500
17.0k
                Ordering::Acquire,
1501
17.0k
            ) {
1502
17.0k
                Ok(_) => break,
1503
0
                Err(actual) => state = actual,
1504
            }
1505
        }
1506
17.0k
        State(state)
1507
17.0k
    }
1508
1509
74.5k
    fn is_rx_task_set(self) -> bool {
1510
74.5k
        self.0 & RX_TASK_SET == RX_TASK_SET
1511
74.5k
    }
1512
1513
17.0k
    fn set_rx_task(cell: &AtomicUsize) -> State {
1514
17.0k
        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1515
17.0k
        State(val | RX_TASK_SET)
1516
17.0k
    }
1517
1518
0
    fn unset_rx_task(cell: &AtomicUsize) -> State {
1519
0
        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1520
0
        State(val & !RX_TASK_SET)
1521
0
    }
1522
1523
54.3k
    fn is_closed(self) -> bool {
1524
54.3k
        self.0 & CLOSED == CLOSED
1525
54.3k
    }
1526
1527
0
    fn set_closed(cell: &AtomicUsize) -> State {
1528
        // Acquire because we want all later writes (attempting to poll) to be
1529
        // ordered after this.
1530
0
        let val = cell.fetch_or(CLOSED, Acquire);
1531
0
        State(val)
1532
0
    }
1533
1534
0
    fn set_tx_task(cell: &AtomicUsize) -> State {
1535
0
        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1536
0
        State(val | TX_TASK_SET)
1537
0
    }
1538
1539
0
    fn unset_tx_task(cell: &AtomicUsize) -> State {
1540
0
        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1541
0
        State(val & !TX_TASK_SET)
1542
0
    }
1543
1544
17.0k
    fn is_tx_task_set(self) -> bool {
1545
17.0k
        self.0 & TX_TASK_SET == TX_TASK_SET
1546
17.0k
    }
1547
1548
17.0k
    fn as_usize(self) -> usize {
1549
17.0k
        self.0
1550
17.0k
    }
1551
1552
37.2k
    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1553
37.2k
        let val = cell.load(order);
1554
37.2k
        State(val)
1555
37.2k
    }
1556
}
1557
1558
impl fmt::Debug for State {
1559
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1560
0
        fmt.debug_struct("State")
1561
0
            .field("is_complete", &self.is_complete())
1562
0
            .field("is_closed", &self.is_closed())
1563
0
            .field("is_rx_task_set", &self.is_rx_task_set())
1564
0
            .field("is_tx_task_set", &self.is_tx_task_set())
1565
0
            .finish()
1566
0
    }
1567
}