Coverage Report

Created: 2026-03-17 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/want-0.3.1/src/lib.rs
Line
Count
Source
1
#![doc(html_root_url = "https://docs.rs/want/0.3.1")]
2
#![deny(warnings)]
3
#![deny(missing_docs)]
4
#![deny(missing_debug_implementations)]
5
6
//! A Futures channel-like utility to signal when a value is wanted.
7
//!
8
//! Futures are supposed to be lazy, and only starting work if `Future::poll`
9
//! is called. The same is true of `Stream`s, but when using a channel as
10
//! a `Stream`, it can be hard to know if the receiver is ready for the next
11
//! value.
12
//!
13
//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14
//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15
//! work to be produced? Just because there is room in the channel buffer
16
//! doesn't mean the work would be used by the receiver.
17
//!
18
//! This is where something like `want` comes in. Added to a channel, you can
19
//! make sure that the `tx` only creates the message and sends it when the `rx`
20
//! has `poll()` for it, and the buffer was empty.
21
//!
22
//! # Example
23
//!
24
//! ```nightly
25
//! # //#![feature(async_await)]
26
//! extern crate want;
27
//!
28
//! # fn spawn<T>(_t: T) {}
29
//! # fn we_still_want_message() -> bool { true }
30
//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31
//! # struct Tx;
32
//! # impl Tx { fn send<T>(&mut self, _: T) {} }
33
//! # struct Rx;
34
//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35
//!
36
//! // Some message that is expensive to produce.
37
//! struct Expensive;
38
//!
39
//! // Some futures-aware MPSC channel...
40
//! let (mut tx, mut rx) = mpsc_channel();
41
//!
42
//! // And our `want` channel!
43
//! let (mut gv, mut tk) = want::new();
44
//!
45
//!
46
//! // Our receiving task...
47
//! spawn(async move {
48
//!     // Maybe something comes up that prevents us from ever
49
//!     // using the expensive message.
50
//!     //
51
//!     // Without `want`, the "send" task may have started to
52
//!     // produce the expensive message even though we wouldn't
53
//!     // be able to use it.
54
//!     if !we_still_want_message() {
55
//!         return;
56
//!     }
57
//!
58
//!     // But we can use it! So tell the `want` channel.
59
//!     tk.want();
60
//!
61
//!     match rx.recv().await {
62
//!         Some(_msg) => println!("got a message"),
63
//!         None => println!("DONE"),
64
//!     }
65
//! });
66
//!
67
//! // Our sending task
68
//! spawn(async move {
69
//!     // It's expensive to create a new message, so we wait until the
70
//!     // receiving end truly *wants* the message.
71
//!     if let Err(_closed) = gv.want().await {
72
//!         // Looks like they will never want it...
73
//!         return;
74
//!     }
75
//!
76
//!     // They want it, let's go!
77
//!     tx.send(Expensive);
78
//! });
79
//!
80
//! # fn main() {}
81
//! ```
82
83
use std::fmt;
84
use std::future::Future;
85
use std::mem;
86
use std::pin::Pin;
87
use std::sync::Arc;
88
use std::sync::atomic::AtomicUsize;
89
// SeqCst is the only ordering used to ensure accessing the state and
90
// TryLock are never re-ordered.
91
use std::sync::atomic::Ordering::SeqCst;
92
use std::task::{self, Poll, Waker};
93
94
95
use try_lock::TryLock;
96
97
/// Create a new `want` channel.
98
0
pub fn new() -> (Giver, Taker) {
99
0
    let inner = Arc::new(Inner {
100
0
        state: AtomicUsize::new(State::Idle.into()),
101
0
        task: TryLock::new(None),
102
0
    });
103
0
    let inner2 = inner.clone();
104
0
    (
105
0
        Giver {
106
0
            inner,
107
0
        },
108
0
        Taker {
109
0
            inner: inner2,
110
0
        },
111
0
    )
112
0
}
113
114
/// An entity that gives a value when wanted.
115
pub struct Giver {
116
    inner: Arc<Inner>,
117
}
118
119
/// An entity that wants a value.
120
pub struct Taker {
121
    inner: Arc<Inner>,
122
}
123
124
/// A cloneable `Giver`.
125
///
126
/// It differs from `Giver` in that you cannot poll for `want`. It's only
127
/// usable as a cancellation watcher.
128
#[derive(Clone)]
129
pub struct SharedGiver {
130
    inner: Arc<Inner>,
131
}
132
133
/// The `Taker` has canceled its interest in a value.
134
pub struct Closed {
135
    _inner: (),
136
}
137
138
#[derive(Clone, Copy, Debug)]
139
enum State {
140
    Idle,
141
    Want,
142
    Give,
143
    Closed,
144
}
145
146
impl From<State> for usize {
147
0
    fn from(s: State) -> usize {
148
0
        match s {
149
0
            State::Idle => 0,
150
0
            State::Want => 1,
151
0
            State::Give => 2,
152
0
            State::Closed => 3,
153
        }
154
0
    }
155
}
156
157
impl From<usize> for State {
158
0
    fn from(num: usize) -> State {
159
0
        match num {
160
0
            0 => State::Idle,
161
0
            1 => State::Want,
162
0
            2 => State::Give,
163
0
            3 => State::Closed,
164
0
            _ => unreachable!("unknown state: {}", num),
165
        }
166
0
    }
167
}
168
169
struct Inner {
170
    state: AtomicUsize,
171
    task: TryLock<Option<Waker>>,
172
}
173
174
// ===== impl Giver ======
175
176
impl Giver {
177
    /// Returns a `Future` that fulfills when the `Taker` has done some action.
178
0
    pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
179
0
        Want(self)
180
0
    }
181
182
    /// Poll whether the `Taker` has registered interest in another value.
183
    ///
184
    /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
185
    /// - If the `Taker` has not called `want()` since last poll, this
186
    ///   returns `Async::NotReady`, and parks the current task to be notified
187
    ///   when the `Taker` does call `want()`.
188
    /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
189
    ///
190
    /// After knowing that the Taker is wanting, the state can be reset by
191
    /// calling [`give`](Giver::give).
192
0
    pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
193
        loop {
194
0
            let state = self.inner.state.load(SeqCst).into();
195
0
            match state {
196
                State::Want => {
197
0
                    return Poll::Ready(Ok(()));
198
                },
199
                State::Closed => {
200
0
                    return Poll::Ready(Err(Closed { _inner: () }));
201
                },
202
                State::Idle | State::Give => {
203
                    // Taker doesn't want anything yet, so park.
204
0
                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
205
206
                        // While we have the lock, try to set to GIVE.
207
0
                        let old = self.inner.state.compare_exchange(
208
0
                            state.into(),
209
0
                            State::Give.into(),
210
0
                            SeqCst,
211
0
                            SeqCst,
212
                        );
213
                        // If it's still the first state (Idle or Give), park current task.
214
0
                        if old == Ok(state.into()) {
215
0
                            let park = locked.as_ref()
216
0
                                .map(|w| !w.will_wake(cx.waker()))
217
0
                                .unwrap_or(true);
218
0
                            if park {
219
0
                                let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
220
0
                                drop(locked);
221
0
                                if let Some(prev_task) = old {
222
0
                                    // there was an old task parked here.
223
0
                                    // it might be waiting to be notified,
224
0
                                    // so poke it before dropping.
225
0
                                    prev_task.wake();
226
0
                                };
227
0
                            }
228
0
                            return Poll::Pending;
229
0
                        }
230
                        // Otherwise, something happened! Go around the loop again.
231
0
                    } else {
232
0
                        // if we couldn't take the lock, then a Taker has it.
233
0
                        // The *ONLY* reason is because it is in the process of notifying us
234
0
                        // of its want.
235
0
                        //
236
0
                        // We need to loop again to see what state it was changed to.
237
0
                    }
238
                },
239
            }
240
        }
241
0
    }
242
243
    /// Mark the state as idle, if the Taker currently is wanting.
244
    ///
245
    /// Returns true if Taker was wanting, false otherwise.
246
    #[inline]
247
0
    pub fn give(&self) -> bool {
248
        // only set to IDLE if it is still Want
249
0
        let old = self.inner.state.compare_exchange(
250
0
            State::Want.into(),
251
0
            State::Idle.into(),
252
0
            SeqCst,
253
0
            SeqCst);
254
0
        old == Ok(State::Want.into())
255
0
    }
Unexecuted instantiation: <want::Giver>::give
Unexecuted instantiation: <want::Giver>::give
256
257
    /// Check if the `Taker` has called `want()` without parking a task.
258
    ///
259
    /// This is safe to call outside of a futures task context, but other
260
    /// means of being notified is left to the user.
261
    #[inline]
262
0
    pub fn is_wanting(&self) -> bool {
263
0
        self.inner.state.load(SeqCst) == State::Want.into()
264
0
    }
Unexecuted instantiation: <want::Giver>::is_wanting
Unexecuted instantiation: <want::Giver>::is_wanting
265
266
267
    /// Check if the `Taker` has canceled interest without parking a task.
268
    #[inline]
269
0
    pub fn is_canceled(&self) -> bool {
270
0
        self.inner.state.load(SeqCst) == State::Closed.into()
271
0
    }
272
273
    /// Converts this into a `SharedGiver`.
274
    #[inline]
275
0
    pub fn shared(self) -> SharedGiver {
276
0
        SharedGiver {
277
0
            inner: self.inner,
278
0
        }
279
0
    }
280
}
281
282
impl fmt::Debug for Giver {
283
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284
0
        f.debug_struct("Giver")
285
0
            .field("state", &self.inner.state())
286
0
            .finish()
287
0
    }
288
}
289
290
// ===== impl SharedGiver ======
291
292
impl SharedGiver {
293
    /// Check if the `Taker` has called `want()` without parking a task.
294
    ///
295
    /// This is safe to call outside of a futures task context, but other
296
    /// means of being notified is left to the user.
297
    #[inline]
298
0
    pub fn is_wanting(&self) -> bool {
299
0
        self.inner.state.load(SeqCst) == State::Want.into()
300
0
    }
301
302
303
    /// Check if the `Taker` has canceled interest without parking a task.
304
    #[inline]
305
0
    pub fn is_canceled(&self) -> bool {
306
0
        self.inner.state.load(SeqCst) == State::Closed.into()
307
0
    }
308
}
309
310
impl fmt::Debug for SharedGiver {
311
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312
0
        f.debug_struct("SharedGiver")
313
0
            .field("state", &self.inner.state())
314
0
            .finish()
315
0
    }
316
}
317
318
// ===== impl Taker ======
319
320
impl Taker {
321
    /// Signal to the `Giver` that the want is canceled.
322
    ///
323
    /// This is useful to tell that the channel is closed if you cannot
324
    /// drop the value yet.
325
    #[inline]
326
0
    pub fn cancel(&mut self) {
327
0
        self.signal(State::Closed)
328
0
    }
Unexecuted instantiation: <want::Taker>::cancel
Unexecuted instantiation: <want::Taker>::cancel
329
330
    /// Signal to the `Giver` that a value is wanted.
331
    #[inline]
332
0
    pub fn want(&mut self) {
333
0
        debug_assert!(
334
0
            self.inner.state.load(SeqCst) != State::Closed.into(),
335
            "want called after cancel"
336
        );
337
0
        self.signal(State::Want)
338
0
    }
Unexecuted instantiation: <want::Taker>::want
Unexecuted instantiation: <want::Taker>::want
339
340
    #[inline]
341
0
    fn signal(&mut self, state: State) {
342
0
        let old_state = self.inner.state.swap(state.into(), SeqCst).into();
343
0
        match old_state {
344
0
            State::Idle | State::Want | State::Closed => (),
345
            State::Give => {
346
                loop {
347
0
                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
348
0
                        if let Some(task) = locked.take() {
349
0
                            drop(locked);
350
0
                            task.wake();
351
0
                        }
352
0
                        return;
353
0
                    } else {
354
0
                        // if we couldn't take the lock, then a Giver has it.
355
0
                        // The *ONLY* reason is because it is in the process of parking.
356
0
                        //
357
0
                        // We need to loop and take the lock so we can notify this task.
358
0
                    }
359
                }
360
            },
361
        }
362
0
    }
Unexecuted instantiation: <want::Taker>::signal
Unexecuted instantiation: <want::Taker>::signal
363
}
364
365
impl Drop for Taker {
366
    #[inline]
367
0
    fn drop(&mut self) {
368
0
        self.signal(State::Closed);
369
0
    }
Unexecuted instantiation: <want::Taker as core::ops::drop::Drop>::drop
Unexecuted instantiation: <want::Taker as core::ops::drop::Drop>::drop
370
}
371
372
impl fmt::Debug for Taker {
373
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374
0
        f.debug_struct("Taker")
375
0
            .field("state", &self.inner.state())
376
0
            .finish()
377
0
    }
378
}
379
380
// ===== impl Closed ======
381
382
impl fmt::Debug for Closed {
383
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384
0
        f.debug_struct("Closed")
385
0
            .finish()
386
0
    }
387
}
388
389
// ===== impl Inner ======
390
391
impl Inner {
392
    #[inline]
393
0
    fn state(&self) -> State {
394
0
        self.state.load(SeqCst).into()
395
0
    }
396
}
397
398
// ===== impl PollFn ======
399
400
struct Want<'a>(&'a mut Giver);
401
402
403
impl Future for Want<'_> {
404
    type Output = Result<(), Closed>;
405
406
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
407
0
        self.0.poll_want(cx)
408
0
    }
409
}
410
411
#[cfg(test)]
412
mod tests {
413
    use std::thread;
414
    use tokio_sync::oneshot;
415
    use super::*;
416
417
    fn block_on<F: Future>(f: F) -> F::Output {
418
        tokio_executor::enter()
419
            .expect("block_on enter")
420
            .block_on(f)
421
    }
422
423
    #[test]
424
    fn want_ready() {
425
        let (mut gv, mut tk) = new();
426
427
        tk.want();
428
429
        block_on(gv.want()).unwrap();
430
    }
431
432
    #[test]
433
    fn want_notify_0() {
434
        let (mut gv, mut tk) = new();
435
        let (tx, rx) = oneshot::channel();
436
437
        thread::spawn(move || {
438
            tk.want();
439
            // use a oneshot to keep this thread alive
440
            // until other thread was notified of want
441
            block_on(rx).expect("rx");
442
        });
443
444
        block_on(gv.want()).expect("want");
445
446
        assert!(gv.is_wanting(), "still wanting after poll_want success");
447
        assert!(gv.give(), "give is true when wanting");
448
449
        assert!(!gv.is_wanting(), "no longer wanting after give");
450
        assert!(!gv.is_canceled(), "give doesn't cancel");
451
452
        assert!(!gv.give(), "give is false if not wanting");
453
454
        tx.send(()).expect("tx");
455
    }
456
457
    /*
458
    /// This tests that if the Giver moves tasks after parking,
459
    /// it will still wake up the correct task.
460
    #[test]
461
    fn want_notify_moving_tasks() {
462
        use std::sync::Arc;
463
        use futures::executor::{spawn, Notify, NotifyHandle};
464
465
        struct WantNotify;
466
467
        impl Notify for WantNotify {
468
            fn notify(&self, _id: usize) {
469
            }
470
        }
471
472
        fn n() -> NotifyHandle {
473
            Arc::new(WantNotify).into()
474
        }
475
476
        let (mut gv, mut tk) = new();
477
478
        let mut s = spawn(poll_fn(move || {
479
            gv.poll_want()
480
        }));
481
482
        // Register with t1 as the task::current()
483
        let t1 = n();
484
        assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
485
486
        thread::spawn(move || {
487
            thread::sleep(::std::time::Duration::from_millis(100));
488
            tk.want();
489
        });
490
491
        // And now, move to a ThreadNotify task.
492
        s.into_inner().wait().expect("poll_want");
493
    }
494
    */
495
496
    #[test]
497
    fn cancel() {
498
        // explicit
499
        let (mut gv, mut tk) = new();
500
501
        assert!(!gv.is_canceled());
502
503
        tk.cancel();
504
505
        assert!(gv.is_canceled());
506
        block_on(gv.want()).unwrap_err();
507
508
        // implicit
509
        let (mut gv, tk) = new();
510
511
        assert!(!gv.is_canceled());
512
513
        drop(tk);
514
515
        assert!(gv.is_canceled());
516
        block_on(gv.want()).unwrap_err();
517
518
        // notifies
519
        let (mut gv, tk) = new();
520
521
        thread::spawn(move || {
522
            let _tk = tk;
523
            // and dropped
524
        });
525
526
        block_on(gv.want()).unwrap_err();
527
    }
528
529
    /*
530
    #[test]
531
    fn stress() {
532
        let nthreads = 5;
533
        let nwants = 100;
534
535
        for _ in 0..nthreads {
536
            let (mut gv, mut tk) = new();
537
            let (mut tx, mut rx) = mpsc::channel(0);
538
539
            // rx thread
540
            thread::spawn(move || {
541
                let mut cnt = 0;
542
                poll_fn(move || {
543
                    while cnt < nwants {
544
                        let n = match rx.poll().expect("rx poll") {
545
                            Async::Ready(n) => n.expect("rx opt"),
546
                            Async::NotReady => {
547
                                tk.want();
548
                                return Ok(Async::NotReady);
549
                            },
550
                        };
551
                        assert_eq!(cnt, n);
552
                        cnt += 1;
553
                    }
554
                    Ok::<_, ()>(Async::Ready(()))
555
                }).wait().expect("rx wait");
556
            });
557
558
            // tx thread
559
            thread::spawn(move || {
560
                let mut cnt = 0;
561
                let nsent = poll_fn(move || {
562
                    loop {
563
                        while let Ok(()) = tx.try_send(cnt) {
564
                            cnt += 1;
565
                        }
566
                        match gv.poll_want() {
567
                            Ok(Async::Ready(_)) => (),
568
                            Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
569
                            Err(_) => return Ok(Async::Ready(cnt)),
570
                        }
571
                    }
572
                }).wait().expect("tx wait");
573
574
                assert_eq!(nsent, nwants);
575
            }).join().expect("thread join");
576
        }
577
    }
578
    */
579
}