Coverage Report

Created: 2025-08-26 07:09

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.47.1/src/sync/oneshot.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3
//! A one-shot channel is used for sending a single message between
4
//! asynchronous tasks. The [`channel`] function is used to create a
5
//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6
//!
7
//! The `Sender` handle is used by the producer to send the value.
8
//! The `Receiver` handle is used by the consumer to receive the value.
9
//!
10
//! Each handle can be used on separate tasks.
11
//!
12
//! Since the `send` method is not async, it can be used anywhere. This includes
13
//! sending between two runtimes, and using it from non-async code.
14
//!
15
//! If the [`Receiver`] is closed before receiving a message which has already
16
//! been sent, the message will remain in the channel until the receiver is
17
//! dropped, at which point the message will be dropped immediately.
18
//!
19
//! # Examples
20
//!
21
//! ```
22
//! use tokio::sync::oneshot;
23
//!
24
//! #[tokio::main]
25
//! async fn main() {
26
//!     let (tx, rx) = oneshot::channel();
27
//!
28
//!     tokio::spawn(async move {
29
//!         if let Err(_) = tx.send(3) {
30
//!             println!("the receiver dropped");
31
//!         }
32
//!     });
33
//!
34
//!     match rx.await {
35
//!         Ok(v) => println!("got = {:?}", v),
36
//!         Err(_) => println!("the sender dropped"),
37
//!     }
38
//! }
39
//! ```
40
//!
41
//! If the sender is dropped without sending, the receiver will fail with
42
//! [`error::RecvError`]:
43
//!
44
//! ```
45
//! use tokio::sync::oneshot;
46
//!
47
//! #[tokio::main]
48
//! async fn main() {
49
//!     let (tx, rx) = oneshot::channel::<u32>();
50
//!
51
//!     tokio::spawn(async move {
52
//!         drop(tx);
53
//!     });
54
//!
55
//!     match rx.await {
56
//!         Ok(_) => panic!("This doesn't happen"),
57
//!         Err(_) => println!("the sender dropped"),
58
//!     }
59
//! }
60
//! ```
61
//!
62
//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63
//! the channel.
64
//!
65
//! ```
66
//! use tokio::sync::oneshot;
67
//! use tokio::time::{interval, sleep, Duration};
68
//!
69
//! #[tokio::main]
70
//! # async fn _doc() {}
71
//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72
//! async fn main() {
73
//!     let (send, mut recv) = oneshot::channel();
74
//!     let mut interval = interval(Duration::from_millis(100));
75
//!
76
//!     # let handle =
77
//!     tokio::spawn(async move {
78
//!         sleep(Duration::from_secs(1)).await;
79
//!         send.send("shut down").unwrap();
80
//!     });
81
//!
82
//!     loop {
83
//!         tokio::select! {
84
//!             _ = interval.tick() => println!("Another 100ms"),
85
//!             msg = &mut recv => {
86
//!                 println!("Got message: {}", msg.unwrap());
87
//!                 break;
88
//!             }
89
//!         }
90
//!     }
91
//!     # handle.await.unwrap();
92
//! }
93
//! ```
94
//!
95
//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96
//! [`Option::take`].
97
//!
98
//! ```
99
//! use tokio::sync::oneshot;
100
//!
101
//! struct SendOnDrop {
102
//!     sender: Option<oneshot::Sender<&'static str>>,
103
//! }
104
//! impl Drop for SendOnDrop {
105
//!     fn drop(&mut self) {
106
//!         if let Some(sender) = self.sender.take() {
107
//!             // Using `let _ =` to ignore send errors.
108
//!             let _ = sender.send("I got dropped!");
109
//!         }
110
//!     }
111
//! }
112
//!
113
//! #[tokio::main]
114
//! # async fn _doc() {}
115
//! # #[tokio::main(flavor = "current_thread")]
116
//! async fn main() {
117
//!     let (send, recv) = oneshot::channel();
118
//!
119
//!     let send_on_drop = SendOnDrop { sender: Some(send) };
120
//!     drop(send_on_drop);
121
//!
122
//!     assert_eq!(recv.await, Ok("I got dropped!"));
123
//! }
124
//! ```
125
126
use crate::loom::cell::UnsafeCell;
127
use crate::loom::sync::atomic::AtomicUsize;
128
use crate::loom::sync::Arc;
129
#[cfg(all(tokio_unstable, feature = "tracing"))]
130
use crate::util::trace;
131
132
use std::fmt;
133
use std::future::Future;
134
use std::mem::MaybeUninit;
135
use std::pin::Pin;
136
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137
use std::task::Poll::{Pending, Ready};
138
use std::task::{ready, Context, Poll, Waker};
139
140
/// Sends a value to the associated [`Receiver`].
141
///
142
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143
/// [`channel`](fn@channel) function.
144
///
145
/// # Examples
146
///
147
/// ```
148
/// use tokio::sync::oneshot;
149
///
150
/// #[tokio::main]
151
/// async fn main() {
152
///     let (tx, rx) = oneshot::channel();
153
///
154
///     tokio::spawn(async move {
155
///         if let Err(_) = tx.send(3) {
156
///             println!("the receiver dropped");
157
///         }
158
///     });
159
///
160
///     match rx.await {
161
///         Ok(v) => println!("got = {:?}", v),
162
///         Err(_) => println!("the sender dropped"),
163
///     }
164
/// }
165
/// ```
166
///
167
/// If the sender is dropped without sending, the receiver will fail with
168
/// [`error::RecvError`]:
169
///
170
/// ```
171
/// use tokio::sync::oneshot;
172
///
173
/// #[tokio::main]
174
/// async fn main() {
175
///     let (tx, rx) = oneshot::channel::<u32>();
176
///
177
///     tokio::spawn(async move {
178
///         drop(tx);
179
///     });
180
///
181
///     match rx.await {
182
///         Ok(_) => panic!("This doesn't happen"),
183
///         Err(_) => println!("the sender dropped"),
184
///     }
185
/// }
186
/// ```
187
///
188
/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189
/// [`Option::take`].
190
///
191
/// ```
192
/// use tokio::sync::oneshot;
193
///
194
/// struct SendOnDrop {
195
///     sender: Option<oneshot::Sender<&'static str>>,
196
/// }
197
/// impl Drop for SendOnDrop {
198
///     fn drop(&mut self) {
199
///         if let Some(sender) = self.sender.take() {
200
///             // Using `let _ =` to ignore send errors.
201
///             let _ = sender.send("I got dropped!");
202
///         }
203
///     }
204
/// }
205
///
206
/// #[tokio::main]
207
/// # async fn _doc() {}
208
/// # #[tokio::main(flavor = "current_thread")]
209
/// async fn main() {
210
///     let (send, recv) = oneshot::channel();
211
///
212
///     let send_on_drop = SendOnDrop { sender: Some(send) };
213
///     drop(send_on_drop);
214
///
215
///     assert_eq!(recv.await, Ok("I got dropped!"));
216
/// }
217
/// ```
218
///
219
/// [`Option`]: std::option::Option
220
/// [`Option::take`]: std::option::Option::take
221
#[derive(Debug)]
222
pub struct Sender<T> {
223
    inner: Option<Arc<Inner<T>>>,
224
    #[cfg(all(tokio_unstable, feature = "tracing"))]
225
    resource_span: tracing::Span,
226
}
227
228
/// Receives a value from the associated [`Sender`].
229
///
230
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231
/// [`channel`](fn@channel) function.
232
///
233
/// This channel has no `recv` method because the receiver itself implements the
234
/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235
///
236
/// The `poll` method on the `Future` trait is allowed to spuriously return
237
/// `Poll::Pending` even if the message has been sent. If such a spurious
238
/// failure happens, then the caller will be woken when the spurious failure has
239
/// been resolved so that the caller can attempt to receive the message again.
240
/// Note that receiving such a wakeup does not guarantee that the next call will
241
/// succeed — it could fail with another spurious failure. (A spurious failure
242
/// does not mean that the message is lost. It is just delayed.)
243
///
244
/// [`Future`]: trait@std::future::Future
245
///
246
/// # Examples
247
///
248
/// ```
249
/// use tokio::sync::oneshot;
250
///
251
/// #[tokio::main]
252
/// async fn main() {
253
///     let (tx, rx) = oneshot::channel();
254
///
255
///     tokio::spawn(async move {
256
///         if let Err(_) = tx.send(3) {
257
///             println!("the receiver dropped");
258
///         }
259
///     });
260
///
261
///     match rx.await {
262
///         Ok(v) => println!("got = {:?}", v),
263
///         Err(_) => println!("the sender dropped"),
264
///     }
265
/// }
266
/// ```
267
///
268
/// If the sender is dropped without sending, the receiver will fail with
269
/// [`error::RecvError`]:
270
///
271
/// ```
272
/// use tokio::sync::oneshot;
273
///
274
/// #[tokio::main]
275
/// async fn main() {
276
///     let (tx, rx) = oneshot::channel::<u32>();
277
///
278
///     tokio::spawn(async move {
279
///         drop(tx);
280
///     });
281
///
282
///     match rx.await {
283
///         Ok(_) => panic!("This doesn't happen"),
284
///         Err(_) => println!("the sender dropped"),
285
///     }
286
/// }
287
/// ```
288
///
289
/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290
/// channel.
291
///
292
/// ```
293
/// use tokio::sync::oneshot;
294
/// use tokio::time::{interval, sleep, Duration};
295
///
296
/// #[tokio::main]
297
/// # async fn _doc() {}
298
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299
/// async fn main() {
300
///     let (send, mut recv) = oneshot::channel();
301
///     let mut interval = interval(Duration::from_millis(100));
302
///
303
///     # let handle =
304
///     tokio::spawn(async move {
305
///         sleep(Duration::from_secs(1)).await;
306
///         send.send("shut down").unwrap();
307
///     });
308
///
309
///     loop {
310
///         tokio::select! {
311
///             _ = interval.tick() => println!("Another 100ms"),
312
///             msg = &mut recv => {
313
///                 println!("Got message: {}", msg.unwrap());
314
///                 break;
315
///             }
316
///         }
317
///     }
318
///     # handle.await.unwrap();
319
/// }
320
/// ```
321
#[derive(Debug)]
322
pub struct Receiver<T> {
323
    inner: Option<Arc<Inner<T>>>,
324
    #[cfg(all(tokio_unstable, feature = "tracing"))]
325
    resource_span: tracing::Span,
326
    #[cfg(all(tokio_unstable, feature = "tracing"))]
327
    async_op_span: tracing::Span,
328
    #[cfg(all(tokio_unstable, feature = "tracing"))]
329
    async_op_poll_span: tracing::Span,
330
}
331
332
pub mod error {
333
    //! `Oneshot` error types.
334
335
    use std::fmt;
336
337
    /// Error returned by the `Future` implementation for `Receiver`.
338
    ///
339
    /// This error is returned by the receiver when the sender is dropped without sending.
340
    #[derive(Debug, Eq, PartialEq, Clone)]
341
    pub struct RecvError(pub(super) ());
342
343
    /// Error returned by the `try_recv` function on `Receiver`.
344
    #[derive(Debug, Eq, PartialEq, Clone)]
345
    pub enum TryRecvError {
346
        /// The send half of the channel has not yet sent a value.
347
        Empty,
348
349
        /// The send half of the channel was dropped without sending a value.
350
        Closed,
351
    }
352
353
    // ===== impl RecvError =====
354
355
    impl fmt::Display for RecvError {
356
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357
0
            write!(fmt, "channel closed")
358
0
        }
359
    }
360
361
    impl std::error::Error for RecvError {}
362
363
    // ===== impl TryRecvError =====
364
365
    impl fmt::Display for TryRecvError {
366
0
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367
0
            match self {
368
0
                TryRecvError::Empty => write!(fmt, "channel empty"),
369
0
                TryRecvError::Closed => write!(fmt, "channel closed"),
370
            }
371
0
        }
372
    }
373
374
    impl std::error::Error for TryRecvError {}
375
}
376
377
use self::error::*;
378
379
struct Inner<T> {
380
    /// Manages the state of the inner cell.
381
    state: AtomicUsize,
382
383
    /// The value. This is set by `Sender` and read by `Receiver`. The state of
384
    /// the cell is tracked by `state`.
385
    value: UnsafeCell<Option<T>>,
386
387
    /// The task to notify when the receiver drops without consuming the value.
388
    ///
389
    /// ## Safety
390
    ///
391
    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392
    /// initialized. If that bit is unset, this field may be uninitialized.
393
    tx_task: Task,
394
395
    /// The task to notify when the value is sent.
396
    ///
397
    /// ## Safety
398
    ///
399
    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400
    /// initialized. If that bit is unset, this field may be uninitialized.
401
    rx_task: Task,
402
}
403
404
struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406
impl Task {
407
3.34k
    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408
3.34k
        self.with_task(|w| w.will_wake(cx.waker()))
409
3.34k
    }
410
411
20.4k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
20.4k
    where
413
20.4k
        F: FnOnce(&Waker) -> R,
414
20.4k
    {
415
20.4k
        self.0.with(|ptr| {
416
20.4k
            let waker: *const Waker = (*ptr).as_ptr();
417
20.4k
            f(&*waker)
418
20.4k
        })
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>::{closure#0}
Line
Count
Source
415
3.34k
        self.0.with(|ptr| {
416
3.34k
            let waker: *const Waker = (*ptr).as_ptr();
417
3.34k
            f(&*waker)
418
3.34k
        })
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>::{closure#0}
Line
Count
Source
415
17.0k
        self.0.with(|ptr| {
416
17.0k
            let waker: *const Waker = (*ptr).as_ptr();
417
17.0k
            f(&*waker)
418
17.0k
        })
419
20.4k
    }
<tokio::sync::oneshot::Task>::with_task::<<tokio::sync::oneshot::Task>::will_wake::{closure#0}, bool>
Line
Count
Source
411
3.34k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
3.34k
    where
413
3.34k
        F: FnOnce(&Waker) -> R,
414
3.34k
    {
415
3.34k
        self.0.with(|ptr| {
416
            let waker: *const Waker = (*ptr).as_ptr();
417
            f(&*waker)
418
3.34k
        })
419
3.34k
    }
<tokio::sync::oneshot::Task>::with_task::<<core::task::wake::Waker>::wake_by_ref, ()>
Line
Count
Source
411
17.0k
    unsafe fn with_task<F, R>(&self, f: F) -> R
412
17.0k
    where
413
17.0k
        F: FnOnce(&Waker) -> R,
414
17.0k
    {
415
17.0k
        self.0.with(|ptr| {
416
            let waker: *const Waker = (*ptr).as_ptr();
417
            f(&*waker)
418
17.0k
        })
419
17.0k
    }
420
421
17.0k
    unsafe fn drop_task(&self) {
422
17.0k
        self.0.with_mut(|ptr| {
423
17.0k
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
424
17.0k
            ptr.drop_in_place();
425
17.0k
        });
426
17.0k
    }
427
428
17.0k
    unsafe fn set_task(&self, cx: &mut Context<'_>) {
429
17.0k
        self.0.with_mut(|ptr| {
430
17.0k
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
431
17.0k
            ptr.write(cx.waker().clone());
432
17.0k
        });
433
17.0k
    }
434
}
435
436
#[derive(Clone, Copy)]
437
struct State(usize);
438
439
/// Creates a new one-shot channel for sending single values across asynchronous
440
/// tasks.
441
///
442
/// The function returns separate "send" and "receive" handles. The `Sender`
443
/// handle is used by the producer to send the value. The `Receiver` handle is
444
/// used by the consumer to receive the value.
445
///
446
/// Each handle can be used on separate tasks.
447
///
448
/// # Examples
449
///
450
/// ```
451
/// use tokio::sync::oneshot;
452
///
453
/// #[tokio::main]
454
/// async fn main() {
455
///     let (tx, rx) = oneshot::channel();
456
///
457
///     tokio::spawn(async move {
458
///         if let Err(_) = tx.send(3) {
459
///             println!("the receiver dropped");
460
///         }
461
///     });
462
///
463
///     match rx.await {
464
///         Ok(v) => println!("got = {:?}", v),
465
///         Err(_) => println!("the sender dropped"),
466
///     }
467
/// }
468
/// ```
469
#[track_caller]
470
17.0k
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471
17.0k
    #[cfg(all(tokio_unstable, feature = "tracing"))]
472
17.0k
    let resource_span = {
473
17.0k
        let location = std::panic::Location::caller();
474
17.0k
475
17.0k
        let resource_span = tracing::trace_span!(
476
17.0k
            parent: None,
477
17.0k
            "runtime.resource",
478
17.0k
            concrete_type = "Sender|Receiver",
479
17.0k
            kind = "Sync",
480
17.0k
            loc.file = location.file(),
481
17.0k
            loc.line = location.line(),
482
17.0k
            loc.col = location.column(),
483
17.0k
        );
484
17.0k
485
17.0k
        resource_span.in_scope(|| {
486
17.0k
            tracing::trace!(
487
17.0k
            target: "runtime::resource::state_update",
488
17.0k
            tx_dropped = false,
489
17.0k
            tx_dropped.op = "override",
490
17.0k
            )
491
17.0k
        });
492
17.0k
493
17.0k
        resource_span.in_scope(|| {
494
17.0k
            tracing::trace!(
495
17.0k
            target: "runtime::resource::state_update",
496
17.0k
            rx_dropped = false,
497
17.0k
            rx_dropped.op = "override",
498
17.0k
            )
499
17.0k
        });
500
17.0k
501
17.0k
        resource_span.in_scope(|| {
502
17.0k
            tracing::trace!(
503
17.0k
            target: "runtime::resource::state_update",
504
17.0k
            value_sent = false,
505
17.0k
            value_sent.op = "override",
506
17.0k
            )
507
17.0k
        });
508
17.0k
509
17.0k
        resource_span.in_scope(|| {
510
17.0k
            tracing::trace!(
511
17.0k
            target: "runtime::resource::state_update",
512
17.0k
            value_received = false,
513
17.0k
            value_received.op = "override",
514
17.0k
            )
515
17.0k
        });
516
17.0k
517
17.0k
        resource_span
518
17.0k
    };
519
17.0k
520
17.0k
    let inner = Arc::new(Inner {
521
17.0k
        state: AtomicUsize::new(State::new().as_usize()),
522
17.0k
        value: UnsafeCell::new(None),
523
17.0k
        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524
17.0k
        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
525
17.0k
    });
526
17.0k
527
17.0k
    let tx = Sender {
528
17.0k
        inner: Some(inner.clone()),
529
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
530
17.0k
        resource_span: resource_span.clone(),
531
17.0k
    };
532
17.0k
533
17.0k
    #[cfg(all(tokio_unstable, feature = "tracing"))]
534
17.0k
    let async_op_span = resource_span
535
17.0k
        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
536
17.0k
537
17.0k
    #[cfg(all(tokio_unstable, feature = "tracing"))]
538
17.0k
    let async_op_poll_span =
539
17.0k
        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
540
17.0k
541
17.0k
    let rx = Receiver {
542
17.0k
        inner: Some(inner),
543
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
544
17.0k
        resource_span,
545
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
546
17.0k
        async_op_span,
547
17.0k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
548
17.0k
        async_op_poll_span,
549
17.0k
    };
550
17.0k
551
17.0k
    (tx, rx)
552
17.0k
}
553
554
impl<T> Sender<T> {
555
    /// Attempts to send a value on this channel, returning it back if it could
556
    /// not be sent.
557
    ///
558
    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
559
    /// channel. It is not marked async because sending a message to an `oneshot`
560
    /// channel never requires any form of waiting.  Because of this, the `send`
561
    /// method can be used in both synchronous and asynchronous code without
562
    /// problems.
563
    ///
564
    /// A successful send occurs when it is determined that the other end of the
565
    /// channel has not hung up already. An unsuccessful send would be one where
566
    /// the corresponding receiver has already been deallocated. Note that a
567
    /// return value of `Err` means that the data will never be received, but
568
    /// a return value of `Ok` does *not* mean that the data will be received.
569
    /// It is possible for the corresponding receiver to hang up immediately
570
    /// after this function returns `Ok`.
571
    ///
572
    /// # Examples
573
    ///
574
    /// Send a value to another task
575
    ///
576
    /// ```
577
    /// use tokio::sync::oneshot;
578
    ///
579
    /// #[tokio::main]
580
    /// async fn main() {
581
    ///     let (tx, rx) = oneshot::channel();
582
    ///
583
    ///     tokio::spawn(async move {
584
    ///         if let Err(_) = tx.send(3) {
585
    ///             println!("the receiver dropped");
586
    ///         }
587
    ///     });
588
    ///
589
    ///     match rx.await {
590
    ///         Ok(v) => println!("got = {:?}", v),
591
    ///         Err(_) => println!("the sender dropped"),
592
    ///     }
593
    /// }
594
    /// ```
595
0
    pub fn send(mut self, t: T) -> Result<(), T> {
596
0
        let inner = self.inner.take().unwrap();
597
0
598
0
        inner.value.with_mut(|ptr| unsafe {
599
0
            // SAFETY: The receiver will not access the `UnsafeCell` unless the
600
0
            // channel has been marked as "complete" (the `VALUE_SENT` state bit
601
0
            // is set).
602
0
            // That bit is only set by the sender later on in this method, and
603
0
            // calling this method consumes `self`. Therefore, if it was possible to
604
0
            // call this method, we know that the `VALUE_SENT` bit is unset, and
605
0
            // the receiver is not currently accessing the `UnsafeCell`.
606
0
            *ptr = Some(t);
607
0
        });
608
0
609
0
        if !inner.complete() {
610
            unsafe {
611
                // SAFETY: The receiver will not access the `UnsafeCell` unless
612
                // the channel has been marked as "complete". Calling
613
                // `complete()` will return true if this bit is set, and false
614
                // if it is not set. Thus, if `complete()` returned false, it is
615
                // safe for us to access the value, because we know that the
616
                // receiver will not.
617
0
                return Err(inner.consume_value().unwrap());
618
            }
619
0
        }
620
0
621
0
        #[cfg(all(tokio_unstable, feature = "tracing"))]
622
0
        self.resource_span.in_scope(|| {
623
0
            tracing::trace!(
624
0
            target: "runtime::resource::state_update",
625
0
            value_sent = true,
626
0
            value_sent.op = "override",
627
0
            )
628
0
        });
629
0
630
0
        Ok(())
631
0
    }
632
633
    /// Waits for the associated [`Receiver`] handle to close.
634
    ///
635
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
636
    /// [`Receiver`] value is dropped.
637
    ///
638
    /// This function is useful when paired with `select!` to abort a
639
    /// computation when the receiver is no longer interested in the result.
640
    ///
641
    /// # Return
642
    ///
643
    /// Returns a `Future` which must be awaited on.
644
    ///
645
    /// [`Receiver`]: Receiver
646
    /// [`close`]: Receiver::close
647
    ///
648
    /// # Examples
649
    ///
650
    /// Basic usage
651
    ///
652
    /// ```
653
    /// use tokio::sync::oneshot;
654
    ///
655
    /// #[tokio::main]
656
    /// async fn main() {
657
    ///     let (mut tx, rx) = oneshot::channel::<()>();
658
    ///
659
    ///     tokio::spawn(async move {
660
    ///         drop(rx);
661
    ///     });
662
    ///
663
    ///     tx.closed().await;
664
    ///     println!("the receiver dropped");
665
    /// }
666
    /// ```
667
    ///
668
    /// Paired with select
669
    ///
670
    /// ```
671
    /// use tokio::sync::oneshot;
672
    /// use tokio::time::{self, Duration};
673
    ///
674
    /// async fn compute() -> String {
675
    ///     // Complex computation returning a `String`
676
    /// # "hello".to_string()
677
    /// }
678
    ///
679
    /// #[tokio::main]
680
    /// async fn main() {
681
    ///     let (mut tx, rx) = oneshot::channel();
682
    ///
683
    ///     tokio::spawn(async move {
684
    ///         tokio::select! {
685
    ///             _ = tx.closed() => {
686
    ///                 // The receiver dropped, no need to do any further work
687
    ///             }
688
    ///             value = compute() => {
689
    ///                 // The send can fail if the channel was closed at the exact same
690
    ///                 // time as when compute() finished, so just ignore the failure.
691
    ///                 let _ = tx.send(value);
692
    ///             }
693
    ///         }
694
    ///     });
695
    ///
696
    ///     // Wait for up to 10 seconds
697
    ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
698
    /// }
699
    /// ```
700
0
    pub async fn closed(&mut self) {
701
        use std::future::poll_fn;
702
703
        #[cfg(all(tokio_unstable, feature = "tracing"))]
704
        let resource_span = self.resource_span.clone();
705
        #[cfg(all(tokio_unstable, feature = "tracing"))]
706
        let closed = trace::async_op(
707
            || poll_fn(|cx| self.poll_closed(cx)),
708
            resource_span,
709
            "Sender::closed",
710
            "poll_closed",
711
            false,
712
        );
713
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
714
0
        let closed = poll_fn(|cx| self.poll_closed(cx));
715
0
716
0
        closed.await;
717
0
    }
718
719
    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
720
    ///
721
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
722
    /// [`Receiver`] value is dropped.
723
    ///
724
    /// If `true` is returned, a call to `send` will always result in an error.
725
    ///
726
    /// [`Receiver`]: Receiver
727
    /// [`close`]: Receiver::close
728
    ///
729
    /// # Examples
730
    ///
731
    /// ```
732
    /// use tokio::sync::oneshot;
733
    ///
734
    /// #[tokio::main]
735
    /// async fn main() {
736
    ///     let (tx, rx) = oneshot::channel();
737
    ///
738
    ///     assert!(!tx.is_closed());
739
    ///
740
    ///     drop(rx);
741
    ///
742
    ///     assert!(tx.is_closed());
743
    ///     assert!(tx.send("never received").is_err());
744
    /// }
745
    /// ```
746
0
    pub fn is_closed(&self) -> bool {
747
0
        let inner = self.inner.as_ref().unwrap();
748
0
749
0
        let state = State::load(&inner.state, Acquire);
750
0
        state.is_closed()
751
0
    }
752
753
    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
754
    /// `Waker` in the provided `Context` to receive a notification when the channel is
755
    /// closed.
756
    ///
757
    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
758
    /// [`Receiver`] value is dropped.
759
    ///
760
    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
761
    /// to the most recent call will be scheduled to receive a wakeup.
762
    ///
763
    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
764
    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
765
    ///
766
    /// # Return value
767
    ///
768
    /// This function returns:
769
    ///
770
    ///  * `Poll::Pending` if the channel is still open.
771
    ///  * `Poll::Ready(())` if the channel is closed.
772
    ///
773
    /// # Examples
774
    ///
775
    /// ```
776
    /// use tokio::sync::oneshot;
777
    ///
778
    /// use std::future::poll_fn;
779
    ///
780
    /// #[tokio::main]
781
    /// async fn main() {
782
    ///     let (mut tx, mut rx) = oneshot::channel::<()>();
783
    ///
784
    ///     tokio::spawn(async move {
785
    ///         rx.close();
786
    ///     });
787
    ///
788
    ///     poll_fn(|cx| tx.poll_closed(cx)).await;
789
    ///
790
    ///     println!("the receiver dropped");
791
    /// }
792
    /// ```
793
0
    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
794
0
        ready!(crate::trace::trace_leaf(cx));
795
796
        // Keep track of task budget
797
0
        let coop = ready!(crate::task::coop::poll_proceed(cx));
798
799
0
        let inner = self.inner.as_ref().unwrap();
800
0
801
0
        let mut state = State::load(&inner.state, Acquire);
802
0
803
0
        if state.is_closed() {
804
0
            coop.made_progress();
805
0
            return Ready(());
806
0
        }
807
0
808
0
        if state.is_tx_task_set() {
809
0
            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
810
0
811
0
            if !will_notify {
812
0
                state = State::unset_tx_task(&inner.state);
813
0
814
0
                if state.is_closed() {
815
                    // Set the flag again so that the waker is released in drop
816
0
                    State::set_tx_task(&inner.state);
817
0
                    coop.made_progress();
818
0
                    return Ready(());
819
0
                } else {
820
0
                    unsafe { inner.tx_task.drop_task() };
821
0
                }
822
0
            }
823
0
        }
824
825
0
        if !state.is_tx_task_set() {
826
            // Attempt to set the task
827
0
            unsafe {
828
0
                inner.tx_task.set_task(cx);
829
0
            }
830
0
831
0
            // Update the state
832
0
            state = State::set_tx_task(&inner.state);
833
0
834
0
            if state.is_closed() {
835
0
                coop.made_progress();
836
0
                return Ready(());
837
0
            }
838
0
        }
839
840
0
        Pending
841
0
    }
842
}
843
844
impl<T> Drop for Sender<T> {
845
17.0k
    fn drop(&mut self) {
846
17.0k
        if let Some(inner) = self.inner.as_ref() {
847
17.0k
            inner.complete();
848
17.0k
            #[cfg(all(tokio_unstable, feature = "tracing"))]
849
17.0k
            self.resource_span.in_scope(|| {
850
17.0k
                tracing::trace!(
851
17.0k
                target: "runtime::resource::state_update",
852
17.0k
                tx_dropped = true,
853
17.0k
                tx_dropped.op = "override",
854
17.0k
                )
855
17.0k
            });
856
17.0k
        }
857
17.0k
    }
858
}
859
860
impl<T> Receiver<T> {
861
    /// Prevents the associated [`Sender`] handle from sending a value.
862
    ///
863
    /// Any `send` operation which happens after calling `close` is guaranteed
864
    /// to fail. After calling `close`, [`try_recv`] should be called to
865
    /// receive a value if one was sent **before** the call to `close`
866
    /// completed.
867
    ///
868
    /// This function is useful to perform a graceful shutdown and ensure that a
869
    /// value will not be sent into the channel and never received.
870
    ///
871
    /// `close` is no-op if a message is already received or the channel
872
    /// is already closed.
873
    ///
874
    /// [`Sender`]: Sender
875
    /// [`try_recv`]: Receiver::try_recv
876
    ///
877
    /// # Examples
878
    ///
879
    /// Prevent a value from being sent
880
    ///
881
    /// ```
882
    /// use tokio::sync::oneshot;
883
    /// use tokio::sync::oneshot::error::TryRecvError;
884
    ///
885
    /// #[tokio::main]
886
    /// async fn main() {
887
    ///     let (tx, mut rx) = oneshot::channel();
888
    ///
889
    ///     assert!(!tx.is_closed());
890
    ///
891
    ///     rx.close();
892
    ///
893
    ///     assert!(tx.is_closed());
894
    ///     assert!(tx.send("never received").is_err());
895
    ///
896
    ///     match rx.try_recv() {
897
    ///         Err(TryRecvError::Closed) => {}
898
    ///         _ => unreachable!(),
899
    ///     }
900
    /// }
901
    /// ```
902
    ///
903
    /// Receive a value sent **before** calling `close`
904
    ///
905
    /// ```
906
    /// use tokio::sync::oneshot;
907
    ///
908
    /// #[tokio::main]
909
    /// async fn main() {
910
    ///     let (tx, mut rx) = oneshot::channel();
911
    ///
912
    ///     assert!(tx.send("will receive").is_ok());
913
    ///
914
    ///     rx.close();
915
    ///
916
    ///     let msg = rx.try_recv().unwrap();
917
    ///     assert_eq!(msg, "will receive");
918
    /// }
919
    /// ```
920
0
    pub fn close(&mut self) {
921
0
        if let Some(inner) = self.inner.as_ref() {
922
0
            inner.close();
923
0
            #[cfg(all(tokio_unstable, feature = "tracing"))]
924
0
            self.resource_span.in_scope(|| {
925
0
                tracing::trace!(
926
0
                target: "runtime::resource::state_update",
927
0
                rx_dropped = true,
928
0
                rx_dropped.op = "override",
929
0
                )
930
0
            });
931
0
        }
932
0
    }
933
934
    /// Checks if this receiver is terminated.
935
    ///
936
    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
937
    /// If so, this receiver should no longer be polled.
938
    ///
939
    /// # Examples
940
    ///
941
    /// Sending a value and polling it.
942
    ///
943
    /// ```
944
    /// use tokio::sync::oneshot;
945
    ///
946
    /// use std::task::Poll;
947
    ///
948
    /// #[tokio::main]
949
    /// async fn main() {
950
    ///     let (tx, mut rx) = oneshot::channel();
951
    ///
952
    ///     // A receiver is not terminated when it is initialized.
953
    ///     assert!(!rx.is_terminated());
954
    ///
955
    ///     // A receiver is not terminated it is polled and is still pending.
956
    ///     let poll = futures::poll!(&mut rx);
957
    ///     assert_eq!(poll, Poll::Pending);
958
    ///     assert!(!rx.is_terminated());
959
    ///
960
    ///     // A receiver is not terminated if a value has been sent, but not yet read.
961
    ///     tx.send(0).unwrap();
962
    ///     assert!(!rx.is_terminated());
963
    ///
964
    ///     // A receiver *is* terminated after it has been polled and yielded a value.
965
    ///     assert_eq!((&mut rx).await, Ok(0));
966
    ///     assert!(rx.is_terminated());
967
    /// }
968
    /// ```
969
    ///
970
    /// Dropping the sender.
971
    ///
972
    /// ```
973
    /// use tokio::sync::oneshot;
974
    ///
975
    /// #[tokio::main]
976
    /// async fn main() {
977
    ///     let (tx, mut rx) = oneshot::channel::<()>();
978
    ///
979
    ///     // A receiver is not immediately terminated when the sender is dropped.
980
    ///     drop(tx);
981
    ///     assert!(!rx.is_terminated());
982
    ///
983
    ///     // A receiver *is* terminated after it has been polled and yielded an error.
984
    ///     let _ = (&mut rx).await.unwrap_err();
985
    ///     assert!(rx.is_terminated());
986
    /// }
987
    /// ```
988
0
    pub fn is_terminated(&self) -> bool {
989
0
        self.inner.is_none()
990
0
    }
991
992
    /// Checks if a channel is empty.
993
    ///
994
    /// This method returns `true` if the channel has no messages.
995
    ///
996
    /// It is not necessarily safe to poll an empty receiver, which may have
997
    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
998
    /// to check whether or not a receiver can be safely polled, instead.
999
    ///
1000
    /// # Examples
1001
    ///
1002
    /// Sending a value.
1003
    ///
1004
    /// ```
1005
    /// use tokio::sync::oneshot;
1006
    ///
1007
    /// #[tokio::main]
1008
    /// async fn main() {
1009
    ///     let (tx, mut rx) = oneshot::channel();
1010
    ///     assert!(rx.is_empty());
1011
    ///
1012
    ///     tx.send(0).unwrap();
1013
    ///     assert!(!rx.is_empty());
1014
    ///
1015
    ///     let _ = (&mut rx).await;
1016
    ///     assert!(rx.is_empty());
1017
    /// }
1018
    /// ```
1019
    ///
1020
    /// Dropping the sender.
1021
    ///
1022
    /// ```
1023
    /// use tokio::sync::oneshot;
1024
    ///
1025
    /// #[tokio::main]
1026
    /// async fn main() {
1027
    ///     let (tx, mut rx) = oneshot::channel::<()>();
1028
    ///
1029
    ///     // A channel is empty if the sender is dropped.
1030
    ///     drop(tx);
1031
    ///     assert!(rx.is_empty());
1032
    ///
1033
    ///     // A closed channel still yields an error, however.
1034
    ///     (&mut rx).await.expect_err("should yield an error");
1035
    ///     assert!(rx.is_empty());
1036
    /// }
1037
    /// ```
1038
    ///
1039
    /// Terminated channels are empty.
1040
    ///
1041
    /// ```should_panic
1042
    /// use tokio::sync::oneshot;
1043
    ///
1044
    /// #[tokio::main]
1045
    /// async fn main() {
1046
    ///     let (tx, mut rx) = oneshot::channel();
1047
    ///     tx.send(0).unwrap();
1048
    ///     let _ = (&mut rx).await;
1049
    ///
1050
    ///     // NB: an empty channel is not necessarily safe to poll!
1051
    ///     assert!(rx.is_empty());
1052
    ///     let _ = (&mut rx).await;
1053
    /// }
1054
    /// ```
1055
0
    pub fn is_empty(&self) -> bool {
1056
0
        let Some(inner) = self.inner.as_ref() else {
1057
            // The channel has already terminated.
1058
0
            return true;
1059
        };
1060
1061
0
        let state = State::load(&inner.state, Acquire);
1062
0
        if state.is_complete() {
1063
            // SAFETY: If `state.is_complete()` returns true, then the
1064
            // `VALUE_SENT` bit has been set and the sender side of the
1065
            // channel will no longer attempt to access the inner
1066
            // `UnsafeCell`. Therefore, it is now safe for us to access the
1067
            // cell.
1068
            //
1069
            // The channel is empty if it does not have a value.
1070
0
            unsafe { !inner.has_value() }
1071
        } else {
1072
            // The receiver closed the channel or no value has been sent yet.
1073
0
            true
1074
        }
1075
0
    }
1076
1077
    /// Attempts to receive a value.
1078
    ///
1079
    /// If a pending value exists in the channel, it is returned. If no value
1080
    /// has been sent, the current task **will not** be registered for
1081
    /// future notification.
1082
    ///
1083
    /// This function is useful to call from outside the context of an
1084
    /// asynchronous task.
1085
    ///
1086
    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1087
    /// spuriously. Any send or close event that happens before this call to
1088
    /// `try_recv` will be correctly returned to the caller.
1089
    ///
1090
    /// # Return
1091
    ///
1092
    /// - `Ok(T)` if a value is pending in the channel.
1093
    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1094
    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1095
    ///   a value, or if the message has already been received.
1096
    ///
1097
    /// # Examples
1098
    ///
1099
    /// `try_recv` before a value is sent, then after.
1100
    ///
1101
    /// ```
1102
    /// use tokio::sync::oneshot;
1103
    /// use tokio::sync::oneshot::error::TryRecvError;
1104
    ///
1105
    /// #[tokio::main]
1106
    /// async fn main() {
1107
    ///     let (tx, mut rx) = oneshot::channel();
1108
    ///
1109
    ///     match rx.try_recv() {
1110
    ///         // The channel is currently empty
1111
    ///         Err(TryRecvError::Empty) => {}
1112
    ///         _ => unreachable!(),
1113
    ///     }
1114
    ///
1115
    ///     // Send a value
1116
    ///     tx.send("hello").unwrap();
1117
    ///
1118
    ///     match rx.try_recv() {
1119
    ///         Ok(value) => assert_eq!(value, "hello"),
1120
    ///         _ => unreachable!(),
1121
    ///     }
1122
    /// }
1123
    /// ```
1124
    ///
1125
    /// `try_recv` when the sender dropped before sending a value
1126
    ///
1127
    /// ```
1128
    /// use tokio::sync::oneshot;
1129
    /// use tokio::sync::oneshot::error::TryRecvError;
1130
    ///
1131
    /// #[tokio::main]
1132
    /// async fn main() {
1133
    ///     let (tx, mut rx) = oneshot::channel::<()>();
1134
    ///
1135
    ///     drop(tx);
1136
    ///
1137
    ///     match rx.try_recv() {
1138
    ///         // The channel will never receive a value.
1139
    ///         Err(TryRecvError::Closed) => {}
1140
    ///         _ => unreachable!(),
1141
    ///     }
1142
    /// }
1143
    /// ```
1144
0
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1145
0
        let result = if let Some(inner) = self.inner.as_ref() {
1146
0
            let state = State::load(&inner.state, Acquire);
1147
0
1148
0
            if state.is_complete() {
1149
                // SAFETY: If `state.is_complete()` returns true, then the
1150
                // `VALUE_SENT` bit has been set and the sender side of the
1151
                // channel will no longer attempt to access the inner
1152
                // `UnsafeCell`. Therefore, it is now safe for us to access the
1153
                // cell.
1154
0
                match unsafe { inner.consume_value() } {
1155
0
                    Some(value) => {
1156
0
                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1157
0
                        self.resource_span.in_scope(|| {
1158
0
                            tracing::trace!(
1159
0
                            target: "runtime::resource::state_update",
1160
0
                            value_received = true,
1161
0
                            value_received.op = "override",
1162
0
                            )
1163
0
                        });
1164
0
                        Ok(value)
1165
                    }
1166
0
                    None => Err(TryRecvError::Closed),
1167
                }
1168
0
            } else if state.is_closed() {
1169
0
                Err(TryRecvError::Closed)
1170
            } else {
1171
                // Not ready, this does not clear `inner`
1172
0
                return Err(TryRecvError::Empty);
1173
            }
1174
        } else {
1175
0
            Err(TryRecvError::Closed)
1176
        };
1177
1178
0
        self.inner = None;
1179
0
        result
1180
0
    }
1181
1182
    /// Blocking receive to call outside of asynchronous contexts.
1183
    ///
1184
    /// # Panics
1185
    ///
1186
    /// This function panics if called within an asynchronous execution
1187
    /// context.
1188
    ///
1189
    /// # Examples
1190
    ///
1191
    /// ```
1192
    /// use std::thread;
1193
    /// use tokio::sync::oneshot;
1194
    ///
1195
    /// #[tokio::main]
1196
    /// async fn main() {
1197
    ///     let (tx, rx) = oneshot::channel::<u8>();
1198
    ///
1199
    ///     let sync_code = thread::spawn(move || {
1200
    ///         assert_eq!(Ok(10), rx.blocking_recv());
1201
    ///     });
1202
    ///
1203
    ///     let _ = tx.send(10);
1204
    ///     sync_code.join().unwrap();
1205
    /// }
1206
    /// ```
1207
    #[track_caller]
1208
    #[cfg(feature = "sync")]
1209
    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1210
0
    pub fn blocking_recv(self) -> Result<T, RecvError> {
1211
0
        crate::future::block_on(self)
1212
0
    }
1213
}
1214
1215
impl<T> Drop for Receiver<T> {
1216
17.0k
    fn drop(&mut self) {
1217
17.0k
        if let Some(inner) = self.inner.as_ref() {
1218
0
            let state = inner.close();
1219
0
1220
0
            if state.is_complete() {
1221
0
                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1222
0
                // so only the receiver can access the value.
1223
0
                drop(unsafe { inner.consume_value() });
1224
0
            }
1225
1226
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1227
            self.resource_span.in_scope(|| {
1228
                tracing::trace!(
1229
                target: "runtime::resource::state_update",
1230
                rx_dropped = true,
1231
                rx_dropped.op = "override",
1232
                )
1233
            });
1234
17.0k
        }
1235
17.0k
    }
1236
}
1237
1238
impl<T> Future for Receiver<T> {
1239
    type Output = Result<T, RecvError>;
1240
1241
37.5k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1242
        // If `inner` is `None`, then `poll()` has already completed.
1243
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1244
        let _res_span = self.resource_span.clone().entered();
1245
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1246
        let _ao_span = self.async_op_span.clone().entered();
1247
        #[cfg(all(tokio_unstable, feature = "tracing"))]
1248
        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1249
1250
37.5k
        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1251
            #[cfg(all(tokio_unstable, feature = "tracing"))]
1252
            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1253
1254
            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1255
37.5k
            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1256
17.0k
1257
17.0k
            res
1258
        } else {
1259
0
            panic!("called after complete");
1260
        };
1261
1262
17.0k
        self.inner = None;
1263
17.0k
        Ready(ret)
1264
37.5k
    }
1265
}
1266
1267
impl<T> Inner<T> {
1268
17.0k
    fn complete(&self) -> bool {
1269
17.0k
        let prev = State::set_complete(&self.state);
1270
17.0k
1271
17.0k
        if prev.is_closed() {
1272
0
            return false;
1273
17.0k
        }
1274
17.0k
1275
17.0k
        if prev.is_rx_task_set() {
1276
            // TODO: Consume waker?
1277
17.0k
            unsafe {
1278
17.0k
                self.rx_task.with_task(Waker::wake_by_ref);
1279
17.0k
            }
1280
7
        }
1281
1282
17.0k
        true
1283
17.0k
    }
1284
1285
37.5k
    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1286
37.5k
        ready!(crate::trace::trace_leaf(cx));
1287
        // Keep track of task budget
1288
37.5k
        let coop = ready!(crate::task::coop::poll_proceed(cx));
1289
1290
        // Load the state
1291
37.5k
        let mut state = State::load(&self.state, Acquire);
1292
37.5k
1293
37.5k
        if state.is_complete() {
1294
17.0k
            coop.made_progress();
1295
17.0k
            match unsafe { self.consume_value() } {
1296
0
                Some(value) => Ready(Ok(value)),
1297
17.0k
                None => Ready(Err(RecvError(()))),
1298
            }
1299
20.4k
        } else if state.is_closed() {
1300
0
            coop.made_progress();
1301
0
            Ready(Err(RecvError(())))
1302
        } else {
1303
20.4k
            if state.is_rx_task_set() {
1304
3.34k
                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1305
3.34k
1306
3.34k
                // Check if the task is still the same
1307
3.34k
                if !will_notify {
1308
                    // Unset the task
1309
0
                    state = State::unset_rx_task(&self.state);
1310
0
                    if state.is_complete() {
1311
                        // Set the flag again so that the waker is released in drop
1312
0
                        State::set_rx_task(&self.state);
1313
0
1314
0
                        coop.made_progress();
1315
0
                        // SAFETY: If `state.is_complete()` returns true, then the
1316
0
                        // `VALUE_SENT` bit has been set and the sender side of the
1317
0
                        // channel will no longer attempt to access the inner
1318
0
                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1319
0
                        // cell.
1320
0
                        return match unsafe { self.consume_value() } {
1321
0
                            Some(value) => Ready(Ok(value)),
1322
0
                            None => Ready(Err(RecvError(()))),
1323
                        };
1324
0
                    } else {
1325
0
                        unsafe { self.rx_task.drop_task() };
1326
0
                    }
1327
3.34k
                }
1328
17.0k
            }
1329
1330
20.4k
            if !state.is_rx_task_set() {
1331
                // Attempt to set the task
1332
17.0k
                unsafe {
1333
17.0k
                    self.rx_task.set_task(cx);
1334
17.0k
                }
1335
17.0k
1336
17.0k
                // Update the state
1337
17.0k
                state = State::set_rx_task(&self.state);
1338
17.0k
1339
17.0k
                if state.is_complete() {
1340
0
                    coop.made_progress();
1341
0
                    match unsafe { self.consume_value() } {
1342
0
                        Some(value) => Ready(Ok(value)),
1343
0
                        None => Ready(Err(RecvError(()))),
1344
                    }
1345
                } else {
1346
17.0k
                    Pending
1347
                }
1348
            } else {
1349
3.34k
                Pending
1350
            }
1351
        }
1352
37.5k
    }
1353
1354
    /// Called by `Receiver` to indicate that the value will never be received.
1355
0
    fn close(&self) -> State {
1356
0
        let prev = State::set_closed(&self.state);
1357
0
1358
0
        if prev.is_tx_task_set() && !prev.is_complete() {
1359
0
            unsafe {
1360
0
                self.tx_task.with_task(Waker::wake_by_ref);
1361
0
            }
1362
0
        }
1363
1364
0
        prev
1365
0
    }
1366
1367
    /// Consumes the value. This function does not check `state`.
1368
    ///
1369
    /// # Safety
1370
    ///
1371
    /// Calling this method concurrently on multiple threads will result in a
1372
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1373
    /// sender *or* the receiver will call this method at a given point in time.
1374
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1375
    /// if it is set, then only the receiver may call this method.
1376
17.0k
    unsafe fn consume_value(&self) -> Option<T> {
1377
17.0k
        self.value.with_mut(|ptr| (*ptr).take())
1378
17.0k
    }
1379
1380
    /// Returns true if there is a value. This function does not check `state`.
1381
    ///
1382
    /// # Safety
1383
    ///
1384
    /// Calling this method concurrently on multiple threads will result in a
1385
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1386
    /// sender *or* the receiver will call this method at a given point in time.
1387
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1388
    /// if it is set, then only the receiver may call this method.
1389
0
    unsafe fn has_value(&self) -> bool {
1390
0
        self.value.with(|ptr| (*ptr).is_some())
1391
0
    }
1392
}
1393
1394
unsafe impl<T: Send> Send for Inner<T> {}
1395
unsafe impl<T: Send> Sync for Inner<T> {}
1396
1397
17.0k
fn mut_load(this: &mut AtomicUsize) -> usize {
1398
17.0k
    this.with_mut(|v| *v)
1399
17.0k
}
1400
1401
impl<T> Drop for Inner<T> {
1402
17.0k
    fn drop(&mut self) {
1403
17.0k
        let state = State(mut_load(&mut self.state));
1404
17.0k
1405
17.0k
        if state.is_rx_task_set() {
1406
17.0k
            unsafe {
1407
17.0k
                self.rx_task.drop_task();
1408
17.0k
            }
1409
7
        }
1410
1411
17.0k
        if state.is_tx_task_set() {
1412
0
            unsafe {
1413
0
                self.tx_task.drop_task();
1414
0
            }
1415
17.0k
        }
1416
1417
        // SAFETY: we have `&mut self`, and therefore we have
1418
        // exclusive access to the value.
1419
        unsafe {
1420
            // Note: the assertion holds because if the value has been sent by sender,
1421
            // we must ensure that the value must have been consumed by the receiver before
1422
            // dropping the `Inner`.
1423
17.0k
            debug_assert!(self.consume_value().is_none());
1424
        }
1425
17.0k
    }
1426
}
1427
1428
impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1429
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1430
        use std::sync::atomic::Ordering::Relaxed;
1431
1432
0
        fmt.debug_struct("Inner")
1433
0
            .field("state", &State::load(&self.state, Relaxed))
1434
0
            .finish()
1435
0
    }
1436
}
1437
1438
/// Indicates that a waker for the receiving task has been set.
1439
///
1440
/// # Safety
1441
///
1442
/// If this bit is not set, the `rx_task` field may be uninitialized.
1443
const RX_TASK_SET: usize = 0b00001;
1444
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1445
///
1446
/// # Safety
1447
///
1448
/// This bit controls which side of the channel is permitted to access the
1449
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1450
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1451
/// the sender.
1452
const VALUE_SENT: usize = 0b00010;
1453
const CLOSED: usize = 0b00100;
1454
1455
/// Indicates that a waker for the sending task has been set.
1456
///
1457
/// # Safety
1458
///
1459
/// If this bit is not set, the `tx_task` field may be uninitialized.
1460
const TX_TASK_SET: usize = 0b01000;
1461
1462
impl State {
1463
17.0k
    fn new() -> State {
1464
17.0k
        State(0)
1465
17.0k
    }
1466
1467
54.5k
    fn is_complete(self) -> bool {
1468
54.5k
        self.0 & VALUE_SENT == VALUE_SENT
1469
54.5k
    }
1470
1471
17.0k
    fn set_complete(cell: &AtomicUsize) -> State {
1472
17.0k
        // This method is a compare-and-swap loop rather than a fetch-or like
1473
17.0k
        // other `set_$WHATEVER` methods on `State`. This is because we must
1474
17.0k
        // check if the state has been closed before setting the `VALUE_SENT`
1475
17.0k
        // bit.
1476
17.0k
        //
1477
17.0k
        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1478
17.0k
        // bit is already set, because `VALUE_SENT` will tell the receiver that
1479
17.0k
        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1480
17.0k
        // `set_complete`, if the channel was closed, the sender will _also_
1481
17.0k
        // access the `UnsafeCell` to take the value back out, so if a
1482
17.0k
        // `poll_recv` or `try_recv` call is occurring concurrently, both
1483
17.0k
        // threads may try to access the `UnsafeCell` if we were to set the
1484
17.0k
        // `VALUE_SENT` bit on a closed channel.
1485
17.0k
        let mut state = cell.load(Ordering::Relaxed);
1486
        loop {
1487
17.0k
            if State(state).is_closed() {
1488
0
                break;
1489
17.0k
            }
1490
17.0k
            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1491
17.0k
            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1492
17.0k
            // fences yet.
1493
17.0k
            match cell.compare_exchange_weak(
1494
17.0k
                state,
1495
17.0k
                state | VALUE_SENT,
1496
17.0k
                Ordering::AcqRel,
1497
17.0k
                Ordering::Acquire,
1498
17.0k
            ) {
1499
17.0k
                Ok(_) => break,
1500
0
                Err(actual) => state = actual,
1501
            }
1502
        }
1503
17.0k
        State(state)
1504
17.0k
    }
1505
1506
75.0k
    fn is_rx_task_set(self) -> bool {
1507
75.0k
        self.0 & RX_TASK_SET == RX_TASK_SET
1508
75.0k
    }
1509
1510
17.0k
    fn set_rx_task(cell: &AtomicUsize) -> State {
1511
17.0k
        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1512
17.0k
        State(val | RX_TASK_SET)
1513
17.0k
    }
1514
1515
0
    fn unset_rx_task(cell: &AtomicUsize) -> State {
1516
0
        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1517
0
        State(val & !RX_TASK_SET)
1518
0
    }
1519
1520
54.5k
    fn is_closed(self) -> bool {
1521
54.5k
        self.0 & CLOSED == CLOSED
1522
54.5k
    }
1523
1524
0
    fn set_closed(cell: &AtomicUsize) -> State {
1525
0
        // Acquire because we want all later writes (attempting to poll) to be
1526
0
        // ordered after this.
1527
0
        let val = cell.fetch_or(CLOSED, Acquire);
1528
0
        State(val)
1529
0
    }
1530
1531
0
    fn set_tx_task(cell: &AtomicUsize) -> State {
1532
0
        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1533
0
        State(val | TX_TASK_SET)
1534
0
    }
1535
1536
0
    fn unset_tx_task(cell: &AtomicUsize) -> State {
1537
0
        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1538
0
        State(val & !TX_TASK_SET)
1539
0
    }
1540
1541
17.0k
    fn is_tx_task_set(self) -> bool {
1542
17.0k
        self.0 & TX_TASK_SET == TX_TASK_SET
1543
17.0k
    }
1544
1545
17.0k
    fn as_usize(self) -> usize {
1546
17.0k
        self.0
1547
17.0k
    }
1548
1549
37.5k
    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1550
37.5k
        let val = cell.load(order);
1551
37.5k
        State(val)
1552
37.5k
    }
1553
}
1554
1555
impl fmt::Debug for State {
1556
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1557
0
        fmt.debug_struct("State")
1558
0
            .field("is_complete", &self.is_complete())
1559
0
            .field("is_closed", &self.is_closed())
1560
0
            .field("is_rx_task_set", &self.is_rx_task_set())
1561
0
            .field("is_tx_task_set", &self.is_tx_task_set())
1562
0
            .finish()
1563
0
    }
1564
}