Coverage Report

Created: 2025-11-16 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.17/src/stream_ext.rs
Line
Count
Source
1
use core::future::Future;
2
use futures_core::Stream;
3
4
mod all;
5
use all::AllFuture;
6
7
mod any;
8
use any::AnyFuture;
9
10
mod chain;
11
pub use chain::Chain;
12
13
pub(crate) mod collect;
14
use collect::{Collect, FromStream};
15
16
mod filter;
17
pub use filter::Filter;
18
19
mod filter_map;
20
pub use filter_map::FilterMap;
21
22
mod fold;
23
use fold::FoldFuture;
24
25
mod fuse;
26
pub use fuse::Fuse;
27
28
mod map;
29
pub use map::Map;
30
31
mod map_while;
32
pub use map_while::MapWhile;
33
34
mod merge;
35
pub use merge::Merge;
36
37
mod next;
38
use next::Next;
39
40
mod skip;
41
pub use skip::Skip;
42
43
mod skip_while;
44
pub use skip_while::SkipWhile;
45
46
mod take;
47
pub use take::Take;
48
49
mod take_while;
50
pub use take_while::TakeWhile;
51
52
mod then;
53
pub use then::Then;
54
55
mod try_next;
56
use try_next::TryNext;
57
58
mod peekable;
59
pub use peekable::Peekable;
60
61
cfg_time! {
62
    pub(crate) mod timeout;
63
    pub(crate) mod timeout_repeating;
64
    pub use timeout::Timeout;
65
    pub use timeout_repeating::TimeoutRepeating;
66
    use tokio::time::{Duration, Interval};
67
    mod throttle;
68
    use throttle::{throttle, Throttle};
69
    mod chunks_timeout;
70
    pub use chunks_timeout::ChunksTimeout;
71
}
72
73
/// An extension trait for the [`Stream`] trait that provides a variety of
74
/// convenient combinator functions.
75
///
76
/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77
/// in the [futures] crate, however both Tokio and futures provide separate
78
/// `StreamExt` utility traits, and some utilities are only available on one of
79
/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80
/// trait in the futures crate.
81
///
82
/// If you need utilities from both `StreamExt` traits, you should prefer to
83
/// import one of them, and use the other through the fully qualified call
84
/// syntax. For example:
85
/// ```
86
/// // import one of the traits:
87
/// use futures::stream::StreamExt;
88
/// # #[tokio::main(flavor = "current_thread")]
89
/// # async fn main() {
90
///
91
/// let a = tokio_stream::iter(vec![1, 3, 5]);
92
/// let b = tokio_stream::iter(vec![2, 4, 6]);
93
///
94
/// // use the fully qualified call syntax for the other trait:
95
/// let merged = tokio_stream::StreamExt::merge(a, b);
96
///
97
/// // use normal call notation for futures::stream::StreamExt::collect
98
/// let output: Vec<_> = merged.collect().await;
99
/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100
/// # }
101
/// ```
102
///
103
/// [`Stream`]: crate::Stream
104
/// [futures]: https://docs.rs/futures
105
/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106
pub trait StreamExt: Stream {
107
    /// Consumes and returns the next value in the stream or `None` if the
108
    /// stream is finished.
109
    ///
110
    /// Equivalent to:
111
    ///
112
    /// ```ignore
113
    /// async fn next(&mut self) -> Option<Self::Item>;
114
    /// ```
115
    ///
116
    /// Note that because `next` doesn't take ownership over the stream,
117
    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118
    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119
    /// be done by boxing the stream using [`Box::pin`] or
120
    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121
    /// crate.
122
    ///
123
    /// # Cancel safety
124
    ///
125
    /// This method is cancel safe. The returned future only
126
    /// holds onto a reference to the underlying stream,
127
    /// so dropping it will never lose a value.
128
    ///
129
    /// # Examples
130
    ///
131
    /// ```
132
    /// # #[tokio::main]
133
    /// # async fn main() {
134
    /// use tokio_stream::{self as stream, StreamExt};
135
    ///
136
    /// let mut stream = stream::iter(1..=3);
137
    ///
138
    /// assert_eq!(stream.next().await, Some(1));
139
    /// assert_eq!(stream.next().await, Some(2));
140
    /// assert_eq!(stream.next().await, Some(3));
141
    /// assert_eq!(stream.next().await, None);
142
    /// # }
143
    /// ```
144
0
    fn next(&mut self) -> Next<'_, Self>
145
0
    where
146
0
        Self: Unpin,
147
    {
148
0
        Next::new(self)
149
0
    }
150
151
    /// Consumes and returns the next item in the stream. If an error is
152
    /// encountered before the next item, the error is returned instead.
153
    ///
154
    /// Equivalent to:
155
    ///
156
    /// ```ignore
157
    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158
    /// ```
159
    ///
160
    /// This is similar to the [`next`](StreamExt::next) combinator,
161
    /// but returns a [`Result<Option<T>, E>`](Result) rather than
162
    /// an [`Option<Result<T, E>>`](Option), making for easy use
163
    /// with the [`?`](std::ops::Try) operator.
164
    ///
165
    /// # Cancel safety
166
    ///
167
    /// This method is cancel safe. The returned future only
168
    /// holds onto a reference to the underlying stream,
169
    /// so dropping it will never lose a value.
170
    ///
171
    /// # Examples
172
    ///
173
    /// ```
174
    /// # #[tokio::main]
175
    /// # async fn main() {
176
    /// use tokio_stream::{self as stream, StreamExt};
177
    ///
178
    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
179
    ///
180
    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
181
    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
182
    /// assert_eq!(stream.try_next().await, Err("nope"));
183
    /// # }
184
    /// ```
185
0
    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
186
0
    where
187
0
        Self: Stream<Item = Result<T, E>> + Unpin,
188
    {
189
0
        TryNext::new(self)
190
0
    }
191
192
    /// Maps this stream's items to a different type, returning a new stream of
193
    /// the resulting type.
194
    ///
195
    /// The provided closure is executed over all elements of this stream as
196
    /// they are made available. It is executed inline with calls to
197
    /// [`poll_next`](Stream::poll_next).
198
    ///
199
    /// Note that this function consumes the stream passed into it and returns a
200
    /// wrapped version of it, similar to the existing `map` methods in the
201
    /// standard library.
202
    ///
203
    /// # Examples
204
    ///
205
    /// ```
206
    /// # #[tokio::main]
207
    /// # async fn main() {
208
    /// use tokio_stream::{self as stream, StreamExt};
209
    ///
210
    /// let stream = stream::iter(1..=3);
211
    /// let mut stream = stream.map(|x| x + 3);
212
    ///
213
    /// assert_eq!(stream.next().await, Some(4));
214
    /// assert_eq!(stream.next().await, Some(5));
215
    /// assert_eq!(stream.next().await, Some(6));
216
    /// # }
217
    /// ```
218
0
    fn map<T, F>(self, f: F) -> Map<Self, F>
219
0
    where
220
0
        F: FnMut(Self::Item) -> T,
221
0
        Self: Sized,
222
    {
223
0
        Map::new(self, f)
224
0
    }
225
226
    /// Map this stream's items to a different type for as long as determined by
227
    /// the provided closure. A stream of the target type will be returned,
228
    /// which will yield elements until the closure returns `None`.
229
    ///
230
    /// The provided closure is executed over all elements of this stream as
231
    /// they are made available, until it returns `None`. It is executed inline
232
    /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
233
    /// the underlying stream will not be polled again.
234
    ///
235
    /// Note that this function consumes the stream passed into it and returns a
236
    /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
237
    /// standard library.
238
    ///
239
    /// # Examples
240
    ///
241
    /// ```
242
    /// # #[tokio::main]
243
    /// # async fn main() {
244
    /// use tokio_stream::{self as stream, StreamExt};
245
    ///
246
    /// let stream = stream::iter(1..=10);
247
    /// let mut stream = stream.map_while(|x| {
248
    ///     if x < 4 {
249
    ///         Some(x + 3)
250
    ///     } else {
251
    ///         None
252
    ///     }
253
    /// });
254
    /// assert_eq!(stream.next().await, Some(4));
255
    /// assert_eq!(stream.next().await, Some(5));
256
    /// assert_eq!(stream.next().await, Some(6));
257
    /// assert_eq!(stream.next().await, None);
258
    /// # }
259
    /// ```
260
0
    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
261
0
    where
262
0
        F: FnMut(Self::Item) -> Option<T>,
263
0
        Self: Sized,
264
    {
265
0
        MapWhile::new(self, f)
266
0
    }
267
268
    /// Maps this stream's items asynchronously to a different type, returning a
269
    /// new stream of the resulting type.
270
    ///
271
    /// The provided closure is executed over all elements of this stream as
272
    /// they are made available, and the returned future is executed. Only one
273
    /// future is executed at the time.
274
    ///
275
    /// Note that this function consumes the stream passed into it and returns a
276
    /// wrapped version of it, similar to the existing `then` methods in the
277
    /// standard library.
278
    ///
279
    /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
280
    /// returned by this method. To handle this, you can use `tokio::pin!` as in
281
    /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
282
    ///
283
    /// # Examples
284
    ///
285
    /// ```
286
    /// # #[tokio::main]
287
    /// # async fn main() {
288
    /// use tokio_stream::{self as stream, StreamExt};
289
    ///
290
    /// async fn do_async_work(value: i32) -> i32 {
291
    ///     value + 3
292
    /// }
293
    ///
294
    /// let stream = stream::iter(1..=3);
295
    /// let stream = stream.then(do_async_work);
296
    ///
297
    /// tokio::pin!(stream);
298
    ///
299
    /// assert_eq!(stream.next().await, Some(4));
300
    /// assert_eq!(stream.next().await, Some(5));
301
    /// assert_eq!(stream.next().await, Some(6));
302
    /// # }
303
    /// ```
304
0
    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
305
0
    where
306
0
        F: FnMut(Self::Item) -> Fut,
307
0
        Fut: Future,
308
0
        Self: Sized,
309
    {
310
0
        Then::new(self, f)
311
0
    }
312
313
    /// Combine two streams into one by interleaving the output of both as it
314
    /// is produced.
315
    ///
316
    /// Values are produced from the merged stream in the order they arrive from
317
    /// the two source streams. If both source streams provide values
318
    /// simultaneously, the merge stream alternates between them. This provides
319
    /// some level of fairness. You should not chain calls to `merge`, as this
320
    /// will break the fairness of the merging.
321
    ///
322
    /// The merged stream completes once **both** source streams complete. When
323
    /// one source stream completes before the other, the merge stream
324
    /// exclusively polls the remaining stream.
325
    ///
326
    /// For merging multiple streams, consider using [`StreamMap`] instead.
327
    ///
328
    /// [`StreamMap`]: crate::StreamMap
329
    ///
330
    /// # Examples
331
    ///
332
    /// ```
333
    /// use tokio_stream::{StreamExt, Stream};
334
    /// use tokio::sync::mpsc;
335
    /// use tokio::time;
336
    ///
337
    /// use std::time::Duration;
338
    /// use std::pin::Pin;
339
    ///
340
    /// # /*
341
    /// #[tokio::main]
342
    /// # */
343
    /// # #[tokio::main(flavor = "current_thread")]
344
    /// async fn main() {
345
    /// # time::pause();
346
    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
347
    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
348
    ///
349
    ///     // Convert the channels to a `Stream`.
350
    ///     let rx1 = Box::pin(async_stream::stream! {
351
    ///           while let Some(item) = rx1.recv().await {
352
    ///               yield item;
353
    ///           }
354
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
355
    ///
356
    ///     let rx2 = Box::pin(async_stream::stream! {
357
    ///           while let Some(item) = rx2.recv().await {
358
    ///               yield item;
359
    ///           }
360
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
361
    ///
362
    ///     let mut rx = rx1.merge(rx2);
363
    ///
364
    ///     tokio::spawn(async move {
365
    ///         // Send some values immediately
366
    ///         tx1.send(1).await.unwrap();
367
    ///         tx1.send(2).await.unwrap();
368
    ///
369
    ///         // Let the other task send values
370
    ///         time::sleep(Duration::from_millis(20)).await;
371
    ///
372
    ///         tx1.send(4).await.unwrap();
373
    ///     });
374
    ///
375
    ///     tokio::spawn(async move {
376
    ///         // Wait for the first task to send values
377
    ///         time::sleep(Duration::from_millis(5)).await;
378
    ///
379
    ///         tx2.send(3).await.unwrap();
380
    ///
381
    ///         time::sleep(Duration::from_millis(25)).await;
382
    ///
383
    ///         // Send the final value
384
    ///         tx2.send(5).await.unwrap();
385
    ///     });
386
    ///
387
    ///    assert_eq!(1, rx.next().await.unwrap());
388
    ///    assert_eq!(2, rx.next().await.unwrap());
389
    ///    assert_eq!(3, rx.next().await.unwrap());
390
    ///    assert_eq!(4, rx.next().await.unwrap());
391
    ///    assert_eq!(5, rx.next().await.unwrap());
392
    ///
393
    ///    // The merged stream is consumed
394
    ///    assert!(rx.next().await.is_none());
395
    /// }
396
    /// ```
397
0
    fn merge<U>(self, other: U) -> Merge<Self, U>
398
0
    where
399
0
        U: Stream<Item = Self::Item>,
400
0
        Self: Sized,
401
    {
402
0
        Merge::new(self, other)
403
0
    }
404
405
    /// Filters the values produced by this stream according to the provided
406
    /// predicate.
407
    ///
408
    /// As values of this stream are made available, the provided predicate `f`
409
    /// will be run against them. If the predicate
410
    /// resolves to `true`, then the stream will yield the value, but if the
411
    /// predicate resolves to `false`, then the value
412
    /// will be discarded and the next value will be produced.
413
    ///
414
    /// Note that this function consumes the stream passed into it and returns a
415
    /// wrapped version of it, similar to [`Iterator::filter`] method in the
416
    /// standard library.
417
    ///
418
    /// # Examples
419
    ///
420
    /// ```
421
    /// # #[tokio::main]
422
    /// # async fn main() {
423
    /// use tokio_stream::{self as stream, StreamExt};
424
    ///
425
    /// let stream = stream::iter(1..=8);
426
    /// let mut evens = stream.filter(|x| x % 2 == 0);
427
    ///
428
    /// assert_eq!(Some(2), evens.next().await);
429
    /// assert_eq!(Some(4), evens.next().await);
430
    /// assert_eq!(Some(6), evens.next().await);
431
    /// assert_eq!(Some(8), evens.next().await);
432
    /// assert_eq!(None, evens.next().await);
433
    /// # }
434
    /// ```
435
0
    fn filter<F>(self, f: F) -> Filter<Self, F>
436
0
    where
437
0
        F: FnMut(&Self::Item) -> bool,
438
0
        Self: Sized,
439
    {
440
0
        Filter::new(self, f)
441
0
    }
442
443
    /// Filters the values produced by this stream while simultaneously mapping
444
    /// them to a different type according to the provided closure.
445
    ///
446
    /// As values of this stream are made available, the provided function will
447
    /// be run on them. If the predicate `f` resolves to
448
    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
449
    /// it resolves to [`None`], then the value will be skipped.
450
    ///
451
    /// Note that this function consumes the stream passed into it and returns a
452
    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
453
    /// standard library.
454
    ///
455
    /// # Examples
456
    /// ```
457
    /// # #[tokio::main]
458
    /// # async fn main() {
459
    /// use tokio_stream::{self as stream, StreamExt};
460
    ///
461
    /// let stream = stream::iter(1..=8);
462
    /// let mut evens = stream.filter_map(|x| {
463
    ///     if x % 2 == 0 { Some(x + 1) } else { None }
464
    /// });
465
    ///
466
    /// assert_eq!(Some(3), evens.next().await);
467
    /// assert_eq!(Some(5), evens.next().await);
468
    /// assert_eq!(Some(7), evens.next().await);
469
    /// assert_eq!(Some(9), evens.next().await);
470
    /// assert_eq!(None, evens.next().await);
471
    /// # }
472
    /// ```
473
0
    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
474
0
    where
475
0
        F: FnMut(Self::Item) -> Option<T>,
476
0
        Self: Sized,
477
    {
478
0
        FilterMap::new(self, f)
479
0
    }
480
481
    /// Creates a stream which ends after the first `None`.
482
    ///
483
    /// After a stream returns `None`, behavior is undefined. Future calls to
484
    /// `poll_next` may or may not return `Some(T)` again or they may panic.
485
    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
486
    /// return `None` forever.
487
    ///
488
    /// # Examples
489
    ///
490
    /// ```
491
    /// use tokio_stream::{Stream, StreamExt};
492
    ///
493
    /// use std::pin::Pin;
494
    /// use std::task::{Context, Poll};
495
    ///
496
    /// // a stream which alternates between Some and None
497
    /// struct Alternate {
498
    ///     state: i32,
499
    /// }
500
    ///
501
    /// impl Stream for Alternate {
502
    ///     type Item = i32;
503
    ///
504
    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
505
    ///         let val = self.state;
506
    ///         self.state = self.state + 1;
507
    ///
508
    ///         // if it's even, Some(i32), else None
509
    ///         if val % 2 == 0 {
510
    ///             Poll::Ready(Some(val))
511
    ///         } else {
512
    ///             Poll::Ready(None)
513
    ///         }
514
    ///     }
515
    /// }
516
    ///
517
    /// #[tokio::main]
518
    /// async fn main() {
519
    ///     let mut stream = Alternate { state: 0 };
520
    ///
521
    ///     // the stream goes back and forth
522
    ///     assert_eq!(stream.next().await, Some(0));
523
    ///     assert_eq!(stream.next().await, None);
524
    ///     assert_eq!(stream.next().await, Some(2));
525
    ///     assert_eq!(stream.next().await, None);
526
    ///
527
    ///     // however, once it is fused
528
    ///     let mut stream = stream.fuse();
529
    ///
530
    ///     assert_eq!(stream.next().await, Some(4));
531
    ///     assert_eq!(stream.next().await, None);
532
    ///
533
    ///     // it will always return `None` after the first time.
534
    ///     assert_eq!(stream.next().await, None);
535
    ///     assert_eq!(stream.next().await, None);
536
    ///     assert_eq!(stream.next().await, None);
537
    /// }
538
    /// ```
539
0
    fn fuse(self) -> Fuse<Self>
540
0
    where
541
0
        Self: Sized,
542
    {
543
0
        Fuse::new(self)
544
0
    }
545
546
    /// Creates a new stream of at most `n` items of the underlying stream.
547
    ///
548
    /// Once `n` items have been yielded from this stream then it will always
549
    /// return that the stream is done.
550
    ///
551
    /// # Examples
552
    ///
553
    /// ```
554
    /// # #[tokio::main]
555
    /// # async fn main() {
556
    /// use tokio_stream::{self as stream, StreamExt};
557
    ///
558
    /// let mut stream = stream::iter(1..=10).take(3);
559
    ///
560
    /// assert_eq!(Some(1), stream.next().await);
561
    /// assert_eq!(Some(2), stream.next().await);
562
    /// assert_eq!(Some(3), stream.next().await);
563
    /// assert_eq!(None, stream.next().await);
564
    /// # }
565
    /// ```
566
0
    fn take(self, n: usize) -> Take<Self>
567
0
    where
568
0
        Self: Sized,
569
    {
570
0
        Take::new(self, n)
571
0
    }
572
573
    /// Take elements from this stream while the provided predicate
574
    /// resolves to `true`.
575
    ///
576
    /// This function, like `Iterator::take_while`, will take elements from the
577
    /// stream until the predicate `f` resolves to `false`. Once one element
578
    /// returns false it will always return that the stream is done.
579
    ///
580
    /// # Examples
581
    ///
582
    /// ```
583
    /// # #[tokio::main]
584
    /// # async fn main() {
585
    /// use tokio_stream::{self as stream, StreamExt};
586
    ///
587
    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
588
    ///
589
    /// assert_eq!(Some(1), stream.next().await);
590
    /// assert_eq!(Some(2), stream.next().await);
591
    /// assert_eq!(Some(3), stream.next().await);
592
    /// assert_eq!(None, stream.next().await);
593
    /// # }
594
    /// ```
595
0
    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
596
0
    where
597
0
        F: FnMut(&Self::Item) -> bool,
598
0
        Self: Sized,
599
    {
600
0
        TakeWhile::new(self, f)
601
0
    }
602
603
    /// Creates a new stream that will skip the `n` first items of the
604
    /// underlying stream.
605
    ///
606
    /// # Examples
607
    ///
608
    /// ```
609
    /// # #[tokio::main]
610
    /// # async fn main() {
611
    /// use tokio_stream::{self as stream, StreamExt};
612
    ///
613
    /// let mut stream = stream::iter(1..=10).skip(7);
614
    ///
615
    /// assert_eq!(Some(8), stream.next().await);
616
    /// assert_eq!(Some(9), stream.next().await);
617
    /// assert_eq!(Some(10), stream.next().await);
618
    /// assert_eq!(None, stream.next().await);
619
    /// # }
620
    /// ```
621
0
    fn skip(self, n: usize) -> Skip<Self>
622
0
    where
623
0
        Self: Sized,
624
    {
625
0
        Skip::new(self, n)
626
0
    }
627
628
    /// Skip elements from the underlying stream while the provided predicate
629
    /// resolves to `true`.
630
    ///
631
    /// This function, like [`Iterator::skip_while`], will ignore elements from the
632
    /// stream until the predicate `f` resolves to `false`. Once one element
633
    /// returns false, the rest of the elements will be yielded.
634
    ///
635
    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
636
    ///
637
    /// # Examples
638
    ///
639
    /// ```
640
    /// # #[tokio::main]
641
    /// # async fn main() {
642
    /// use tokio_stream::{self as stream, StreamExt};
643
    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
644
    ///
645
    /// assert_eq!(Some(3), stream.next().await);
646
    /// assert_eq!(Some(4), stream.next().await);
647
    /// assert_eq!(Some(1), stream.next().await);
648
    /// assert_eq!(None, stream.next().await);
649
    /// # }
650
    /// ```
651
0
    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
652
0
    where
653
0
        F: FnMut(&Self::Item) -> bool,
654
0
        Self: Sized,
655
    {
656
0
        SkipWhile::new(self, f)
657
0
    }
658
659
    /// Tests if every element of the stream matches a predicate.
660
    ///
661
    /// Equivalent to:
662
    ///
663
    /// ```ignore
664
    /// async fn all<F>(&mut self, f: F) -> bool;
665
    /// ```
666
    ///
667
    /// `all()` takes a closure that returns `true` or `false`. It applies
668
    /// this closure to each element of the stream, and if they all return
669
    /// `true`, then so does `all`. If any of them return `false`, it
670
    /// returns `false`. An empty stream returns `true`.
671
    ///
672
    /// `all()` is short-circuiting; in other words, it will stop processing
673
    /// as soon as it finds a `false`, given that no matter what else happens,
674
    /// the result will also be `false`.
675
    ///
676
    /// An empty stream returns `true`.
677
    ///
678
    /// # Examples
679
    ///
680
    /// Basic usage:
681
    ///
682
    /// ```
683
    /// # #[tokio::main]
684
    /// # async fn main() {
685
    /// use tokio_stream::{self as stream, StreamExt};
686
    ///
687
    /// let a = [1, 2, 3];
688
    ///
689
    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
690
    ///
691
    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
692
    /// # }
693
    /// ```
694
    ///
695
    /// Stopping at the first `false`:
696
    ///
697
    /// ```
698
    /// # #[tokio::main]
699
    /// # async fn main() {
700
    /// use tokio_stream::{self as stream, StreamExt};
701
    ///
702
    /// let a = [1, 2, 3];
703
    ///
704
    /// let mut iter = stream::iter(&a);
705
    ///
706
    /// assert!(!iter.all(|&x| x != 2).await);
707
    ///
708
    /// // we can still use `iter`, as there are more elements.
709
    /// assert_eq!(iter.next().await, Some(&3));
710
    /// # }
711
    /// ```
712
0
    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
713
0
    where
714
0
        Self: Unpin,
715
0
        F: FnMut(Self::Item) -> bool,
716
    {
717
0
        AllFuture::new(self, f)
718
0
    }
719
720
    /// Tests if any element of the stream matches a predicate.
721
    ///
722
    /// Equivalent to:
723
    ///
724
    /// ```ignore
725
    /// async fn any<F>(&mut self, f: F) -> bool;
726
    /// ```
727
    ///
728
    /// `any()` takes a closure that returns `true` or `false`. It applies
729
    /// this closure to each element of the stream, and if any of them return
730
    /// `true`, then so does `any()`. If they all return `false`, it
731
    /// returns `false`.
732
    ///
733
    /// `any()` is short-circuiting; in other words, it will stop processing
734
    /// as soon as it finds a `true`, given that no matter what else happens,
735
    /// the result will also be `true`.
736
    ///
737
    /// An empty stream returns `false`.
738
    ///
739
    /// Basic usage:
740
    ///
741
    /// ```
742
    /// # #[tokio::main]
743
    /// # async fn main() {
744
    /// use tokio_stream::{self as stream, StreamExt};
745
    ///
746
    /// let a = [1, 2, 3];
747
    ///
748
    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
749
    ///
750
    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
751
    /// # }
752
    /// ```
753
    ///
754
    /// Stopping at the first `true`:
755
    ///
756
    /// ```
757
    /// # #[tokio::main]
758
    /// # async fn main() {
759
    /// use tokio_stream::{self as stream, StreamExt};
760
    ///
761
    /// let a = [1, 2, 3];
762
    ///
763
    /// let mut iter = stream::iter(&a);
764
    ///
765
    /// assert!(iter.any(|&x| x != 2).await);
766
    ///
767
    /// // we can still use `iter`, as there are more elements.
768
    /// assert_eq!(iter.next().await, Some(&2));
769
    /// # }
770
    /// ```
771
0
    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
772
0
    where
773
0
        Self: Unpin,
774
0
        F: FnMut(Self::Item) -> bool,
775
    {
776
0
        AnyFuture::new(self, f)
777
0
    }
778
779
    /// Combine two streams into one by first returning all values from the
780
    /// first stream then all values from the second stream.
781
    ///
782
    /// As long as `self` still has values to emit, no values from `other` are
783
    /// emitted, even if some are ready.
784
    ///
785
    /// # Examples
786
    ///
787
    /// ```
788
    /// use tokio_stream::{self as stream, StreamExt};
789
    ///
790
    /// #[tokio::main]
791
    /// async fn main() {
792
    ///     let one = stream::iter(vec![1, 2, 3]);
793
    ///     let two = stream::iter(vec![4, 5, 6]);
794
    ///
795
    ///     let mut stream = one.chain(two);
796
    ///
797
    ///     assert_eq!(stream.next().await, Some(1));
798
    ///     assert_eq!(stream.next().await, Some(2));
799
    ///     assert_eq!(stream.next().await, Some(3));
800
    ///     assert_eq!(stream.next().await, Some(4));
801
    ///     assert_eq!(stream.next().await, Some(5));
802
    ///     assert_eq!(stream.next().await, Some(6));
803
    ///     assert_eq!(stream.next().await, None);
804
    /// }
805
    /// ```
806
0
    fn chain<U>(self, other: U) -> Chain<Self, U>
807
0
    where
808
0
        U: Stream<Item = Self::Item>,
809
0
        Self: Sized,
810
    {
811
0
        Chain::new(self, other)
812
0
    }
813
814
    /// A combinator that applies a function to every element in a stream
815
    /// producing a single, final value.
816
    ///
817
    /// Equivalent to:
818
    ///
819
    /// ```ignore
820
    /// async fn fold<B, F>(self, init: B, f: F) -> B;
821
    /// ```
822
    ///
823
    /// # Examples
824
    /// Basic usage:
825
    /// ```
826
    /// # #[tokio::main]
827
    /// # async fn main() {
828
    /// use tokio_stream::{self as stream, *};
829
    ///
830
    /// let s = stream::iter(vec![1u8, 2, 3]);
831
    /// let sum = s.fold(0, |acc, x| acc + x).await;
832
    ///
833
    /// assert_eq!(sum, 6);
834
    /// # }
835
    /// ```
836
0
    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
837
0
    where
838
0
        Self: Sized,
839
0
        F: FnMut(B, Self::Item) -> B,
840
    {
841
0
        FoldFuture::new(self, init, f)
842
0
    }
843
844
    /// Drain stream pushing all emitted values into a collection.
845
    ///
846
    /// Equivalent to:
847
    ///
848
    /// ```ignore
849
    /// async fn collect<T>(self) -> T;
850
    /// ```
851
    ///
852
    /// `collect` streams all values, awaiting as needed. Values are pushed into
853
    /// a collection. A number of different target collection types are
854
    /// supported, including [`Vec`], [`String`], and [`Bytes`].
855
    ///
856
    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
857
    ///
858
    /// # `Result`
859
    ///
860
    /// `collect()` can also be used with streams of type `Result<T, E>` where
861
    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
862
    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
863
    /// streaming is terminated and `collect()` returns the `Err`.
864
    ///
865
    /// # Notes
866
    ///
867
    /// `FromStream` is currently a sealed trait. Stabilization is pending
868
    /// enhancements to the Rust language.
869
    ///
870
    /// # Examples
871
    ///
872
    /// Basic usage:
873
    ///
874
    /// ```
875
    /// use tokio_stream::{self as stream, StreamExt};
876
    ///
877
    /// #[tokio::main]
878
    /// async fn main() {
879
    ///     let doubled: Vec<i32> =
880
    ///         stream::iter(vec![1, 2, 3])
881
    ///             .map(|x| x * 2)
882
    ///             .collect()
883
    ///             .await;
884
    ///
885
    ///     assert_eq!(vec![2, 4, 6], doubled);
886
    /// }
887
    /// ```
888
    ///
889
    /// Collecting a stream of `Result` values
890
    ///
891
    /// ```
892
    /// use tokio_stream::{self as stream, StreamExt};
893
    ///
894
    /// #[tokio::main]
895
    /// async fn main() {
896
    ///     // A stream containing only `Ok` values will be collected
897
    ///     let values: Result<Vec<i32>, &str> =
898
    ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
899
    ///             .collect()
900
    ///             .await;
901
    ///
902
    ///     assert_eq!(Ok(vec![1, 2, 3]), values);
903
    ///
904
    ///     // A stream containing `Err` values will return the first error.
905
    ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
906
    ///
907
    ///     let values: Result<Vec<i32>, &str> =
908
    ///         stream::iter(results)
909
    ///             .collect()
910
    ///             .await;
911
    ///
912
    ///     assert_eq!(Err("no"), values);
913
    /// }
914
    /// ```
915
0
    fn collect<T>(self) -> Collect<Self, T>
916
0
    where
917
0
        T: FromStream<Self::Item>,
918
0
        Self: Sized,
919
    {
920
0
        Collect::new(self)
921
0
    }
922
923
    /// Applies a per-item timeout to the passed stream.
924
    ///
925
    /// `timeout()` takes a `Duration` that represents the maximum amount of
926
    /// time each element of the stream has to complete before timing out.
927
    ///
928
    /// If the wrapped stream yields a value before the deadline is reached, the
929
    /// value is returned. Otherwise, an error is returned. The caller may decide
930
    /// to continue consuming the stream and will eventually get the next source
931
    /// stream value once it becomes available. See
932
    /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
933
    /// where the timeouts will repeat.
934
    ///
935
    /// # Notes
936
    ///
937
    /// This function consumes the stream passed into it and returns a
938
    /// wrapped version of it.
939
    ///
940
    /// Polling the returned stream will continue to poll the inner stream even
941
    /// if one or more items time out.
942
    ///
943
    /// # Examples
944
    ///
945
    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
946
    ///
947
    /// ```
948
    /// # #[tokio::main]
949
    /// # async fn main() {
950
    /// use tokio_stream::{self as stream, StreamExt};
951
    /// use std::time::Duration;
952
    /// # let int_stream = stream::iter(1..=3);
953
    ///
954
    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
955
    /// tokio::pin!(int_stream);
956
    ///
957
    /// // When no items time out, we get the 3 elements in succession:
958
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
959
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
960
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
961
    /// assert_eq!(int_stream.try_next().await, Ok(None));
962
    ///
963
    /// // If the second item times out, we get an error and continue polling the stream:
964
    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
965
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
966
    /// assert!(int_stream.try_next().await.is_err());
967
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
968
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
969
    /// assert_eq!(int_stream.try_next().await, Ok(None));
970
    ///
971
    /// // If we want to stop consuming the source stream the first time an
972
    /// // element times out, we can use the `take_while` operator:
973
    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
974
    /// let mut int_stream = int_stream.take_while(Result::is_ok);
975
    ///
976
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
977
    /// assert_eq!(int_stream.try_next().await, Ok(None));
978
    /// # }
979
    /// ```
980
    ///
981
    /// Once a timeout error is received, no further events will be received
982
    /// unless the wrapped stream yields a value (timeouts do not repeat).
983
    ///
984
    /// ```
985
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
986
    /// # async fn main() {
987
    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
988
    /// use std::time::Duration;
989
    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
990
    /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
991
    /// tokio::pin!(timeout_stream);
992
    ///
993
    /// // Only one timeout will be received between values in the source stream.
994
    /// assert!(timeout_stream.try_next().await.is_ok());
995
    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
996
    /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
997
    /// # }
998
    /// ```
999
    #[cfg(feature = "time")]
1000
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1001
0
    fn timeout(self, duration: Duration) -> Timeout<Self>
1002
0
    where
1003
0
        Self: Sized,
1004
    {
1005
0
        Timeout::new(self, duration)
1006
0
    }
1007
1008
    /// Applies a per-item timeout to the passed stream.
1009
    ///
1010
    /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1011
    /// element of the stream has to complete before timing out.
1012
    ///
1013
    /// If the wrapped stream yields a value before the deadline is reached, the
1014
    /// value is returned. Otherwise, an error is returned. The caller may decide
1015
    /// to continue consuming the stream and will eventually get the next source
1016
    /// stream value once it becomes available. Unlike `timeout()`, if no value
1017
    /// becomes available before the deadline is reached, additional errors are
1018
    /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1019
    /// for an alternative where the timeouts do not repeat.
1020
    ///
1021
    /// # Notes
1022
    ///
1023
    /// This function consumes the stream passed into it and returns a
1024
    /// wrapped version of it.
1025
    ///
1026
    /// Polling the returned stream will continue to poll the inner stream even
1027
    /// if one or more items time out.
1028
    ///
1029
    /// # Examples
1030
    ///
1031
    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1032
    ///
1033
    /// ```
1034
    /// # #[tokio::main]
1035
    /// # async fn main() {
1036
    /// use tokio_stream::{self as stream, StreamExt};
1037
    /// use std::time::Duration;
1038
    /// # let int_stream = stream::iter(1..=3);
1039
    ///
1040
    /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1041
    /// tokio::pin!(int_stream);
1042
    ///
1043
    /// // When no items time out, we get the 3 elements in succession:
1044
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1045
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1046
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1047
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1048
    ///
1049
    /// // If the second item times out, we get an error and continue polling the stream:
1050
    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1051
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1052
    /// assert!(int_stream.try_next().await.is_err());
1053
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1054
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1055
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1056
    ///
1057
    /// // If we want to stop consuming the source stream the first time an
1058
    /// // element times out, we can use the `take_while` operator:
1059
    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1060
    /// let mut int_stream = int_stream.take_while(Result::is_ok);
1061
    ///
1062
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1063
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1064
    /// # }
1065
    /// ```
1066
    ///
1067
    /// Timeout errors will be continuously produced at the specified interval
1068
    /// until the wrapped stream yields a value.
1069
    ///
1070
    /// ```
1071
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1072
    /// # async fn main() {
1073
    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1074
    /// use std::time::Duration;
1075
    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1076
    /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1077
    /// tokio::pin!(timeout_stream);
1078
    ///
1079
    /// // Multiple timeouts will be received between values in the source stream.
1080
    /// assert!(timeout_stream.try_next().await.is_ok());
1081
    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1082
    /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1083
    /// // Will eventually receive another value from the source stream...
1084
    /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1085
    /// # }
1086
    /// ```
1087
    #[cfg(feature = "time")]
1088
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1089
0
    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1090
0
    where
1091
0
        Self: Sized,
1092
    {
1093
0
        TimeoutRepeating::new(self, interval)
1094
0
    }
1095
1096
    /// Slows down a stream by enforcing a delay between items.
1097
    ///
1098
    /// The underlying timer behind this utility has a granularity of one millisecond.
1099
    ///
1100
    /// # Example
1101
    ///
1102
    /// Create a throttled stream.
1103
    /// ```rust,no_run
1104
    /// use std::time::Duration;
1105
    /// use tokio_stream::StreamExt;
1106
    ///
1107
    /// # async fn dox() {
1108
    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1109
    /// tokio::pin!(item_stream);
1110
    ///
1111
    /// loop {
1112
    ///     // The string will be produced at most every 2 seconds
1113
    ///     println!("{:?}", item_stream.next().await);
1114
    /// }
1115
    /// # }
1116
    /// ```
1117
    #[cfg(feature = "time")]
1118
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1119
0
    fn throttle(self, duration: Duration) -> Throttle<Self>
1120
0
    where
1121
0
        Self: Sized,
1122
    {
1123
0
        throttle(duration, self)
1124
0
    }
1125
1126
    /// Batches the items in the given stream using a maximum duration and size for each batch.
1127
    ///
1128
    /// This stream returns the next batch of items in the following situations:
1129
    ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1130
    ///  2. The time since the first item of a batch is greater than the given duration.
1131
    ///  3. The end of the stream is reached.
1132
    ///
1133
    /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1134
    /// will not be emitted if no items are received upstream.
1135
    ///
1136
    /// # Panics
1137
    ///
1138
    /// This function panics if `max_size` is zero
1139
    ///
1140
    /// # Example
1141
    ///
1142
    /// ```rust
1143
    /// use std::time::Duration;
1144
    /// use tokio::time;
1145
    /// use tokio_stream::{self as stream, StreamExt};
1146
    /// use futures::FutureExt;
1147
    ///
1148
    /// #[tokio::main]
1149
    /// # async fn _unused() {}
1150
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1151
    /// async fn main() {
1152
    ///     let iter = vec![1, 2, 3, 4].into_iter();
1153
    ///     let stream0 = stream::iter(iter);
1154
    ///
1155
    ///     let iter = vec![5].into_iter();
1156
    ///     let stream1 = stream::iter(iter)
1157
    ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1158
    ///
1159
    ///     let chunk_stream = stream0
1160
    ///         .chain(stream1)
1161
    ///         .chunks_timeout(3, Duration::from_secs(2));
1162
    ///     tokio::pin!(chunk_stream);
1163
    ///
1164
    ///     // a full batch was received
1165
    ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1166
    ///     // deadline was reached before max_size was reached
1167
    ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1168
    ///     // last element in the stream
1169
    ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1170
    /// }
1171
    /// ```
1172
    #[cfg(feature = "time")]
1173
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1174
    #[track_caller]
1175
0
    fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1176
0
    where
1177
0
        Self: Sized,
1178
    {
1179
0
        assert!(max_size > 0, "`max_size` must be non-zero.");
1180
0
        ChunksTimeout::new(self, max_size, duration)
1181
0
    }
1182
1183
    /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1184
    /// consumed.
1185
    /// ```rust
1186
    /// use tokio_stream::{self as stream, StreamExt};
1187
    ///
1188
    /// #[tokio::main]
1189
    /// # async fn _unused() {}
1190
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1191
    /// async fn main() {
1192
    ///     let iter = vec![1, 2, 3, 4].into_iter();
1193
    ///     let mut stream = stream::iter(iter).peekable();
1194
    ///
1195
    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1196
    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1197
    ///     assert_eq!(stream.next().await.unwrap(), 1);
1198
    ///     assert_eq!(*stream.peek().await.unwrap(), 2);
1199
    /// }
1200
    /// ```
1201
0
    fn peekable(self) -> Peekable<Self>
1202
0
    where
1203
0
        Self: Sized,
1204
    {
1205
0
        Peekable::new(self)
1206
0
    }
1207
}
1208
1209
impl<St: ?Sized> StreamExt for St where St: Stream {}
1210
1211
/// Merge the size hints from two streams.
1212
0
fn merge_size_hints(
1213
0
    (left_low, left_high): (usize, Option<usize>),
1214
0
    (right_low, right_high): (usize, Option<usize>),
1215
0
) -> (usize, Option<usize>) {
1216
0
    let low = left_low.saturating_add(right_low);
1217
0
    let high = match (left_high, right_high) {
1218
0
        (Some(h1), Some(h2)) => h1.checked_add(h2),
1219
0
        _ => None,
1220
    };
1221
0
    (low, high)
1222
0
}