Coverage Report

Created: 2025-07-23 06:18

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.21/src/stream/select.rs
Line
Count
Source (jump to first uncovered line)
1
use super::assert_stream;
2
use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy};
3
use core::pin::Pin;
4
use futures_core::stream::{FusedStream, Stream};
5
use futures_core::task::{Context, Poll};
6
use pin_project_lite::pin_project;
7
8
pin_project! {
9
    /// Stream for the [`select()`] function.
10
    #[derive(Debug)]
11
    #[must_use = "streams do nothing unless polled"]
12
    pub struct Select<St1, St2> {
13
        #[pin]
14
        inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
15
    }
16
}
17
18
/// This function will attempt to pull items from both streams. Each
19
/// stream will be polled in a round-robin fashion, and whenever a stream is
20
/// ready to yield an item that item is yielded.
21
///
22
/// After one of the two input streams completes, the remaining one will be
23
/// polled exclusively. The returned stream completes when both input
24
/// streams have completed.
25
///
26
/// Note that this function consumes both streams and returns a wrapped
27
/// version of them.
28
///
29
/// ## Examples
30
///
31
/// ```rust
32
/// # futures::executor::block_on(async {
33
/// use futures::stream::{ repeat, select, StreamExt };
34
///
35
/// let left = repeat(1);
36
/// let right = repeat(2);
37
///
38
/// let mut out = select(left, right);
39
///
40
/// for _ in 0..100 {
41
///     // We should be alternating.
42
///     assert_eq!(1, out.select_next_some().await);
43
///     assert_eq!(2, out.select_next_some().await);
44
/// }
45
/// # });
46
/// ```
47
0
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
48
0
where
49
0
    St1: Stream,
50
0
    St2: Stream<Item = St1::Item>,
51
0
{
52
0
    fn round_robin(last: &mut PollNext) -> PollNext {
53
0
        last.toggle()
54
0
    }
55
56
0
    assert_stream::<St1::Item, _>(Select {
57
0
        inner: select_with_strategy(stream1, stream2, round_robin),
58
0
    })
59
0
}
60
61
impl<St1, St2> Select<St1, St2> {
62
    /// Acquires a reference to the underlying streams that this combinator is
63
    /// pulling from.
64
0
    pub fn get_ref(&self) -> (&St1, &St2) {
65
0
        self.inner.get_ref()
66
0
    }
67
68
    /// Acquires a mutable reference to the underlying streams that this
69
    /// combinator is pulling from.
70
    ///
71
    /// Note that care must be taken to avoid tampering with the state of the
72
    /// stream which may otherwise confuse this combinator.
73
0
    pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
74
0
        self.inner.get_mut()
75
0
    }
76
77
    /// Acquires a pinned mutable reference to the underlying streams that this
78
    /// combinator is pulling from.
79
    ///
80
    /// Note that care must be taken to avoid tampering with the state of the
81
    /// stream which may otherwise confuse this combinator.
82
0
    pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
83
0
        let this = self.project();
84
0
        this.inner.get_pin_mut()
85
0
    }
86
87
    /// Consumes this combinator, returning the underlying streams.
88
    ///
89
    /// Note that this may discard intermediate state of this combinator, so
90
    /// care should be taken to avoid losing resources when this is called.
91
0
    pub fn into_inner(self) -> (St1, St2) {
92
0
        self.inner.into_inner()
93
0
    }
94
}
95
96
impl<St1, St2> FusedStream for Select<St1, St2>
97
where
98
    St1: Stream,
99
    St2: Stream<Item = St1::Item>,
100
{
101
0
    fn is_terminated(&self) -> bool {
102
0
        self.inner.is_terminated()
103
0
    }
104
}
105
106
impl<St1, St2> Stream for Select<St1, St2>
107
where
108
    St1: Stream,
109
    St2: Stream<Item = St1::Item>,
110
{
111
    type Item = St1::Item;
112
113
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
114
0
        let this = self.project();
115
0
        this.inner.poll_next(cx)
116
0
    }
117
}