Coverage Report

Created: 2026-02-14 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.18/src/stream_ext.rs
Line
Count
Source
1
use core::future::Future;
2
use futures_core::Stream;
3
4
mod all;
5
use all::AllFuture;
6
7
mod any;
8
use any::AnyFuture;
9
10
mod chain;
11
pub use chain::Chain;
12
13
pub(crate) mod collect;
14
use collect::{Collect, FromStream};
15
16
mod filter;
17
pub use filter::Filter;
18
19
mod filter_map;
20
pub use filter_map::FilterMap;
21
22
mod fold;
23
use fold::FoldFuture;
24
25
mod fuse;
26
pub use fuse::Fuse;
27
28
mod map;
29
pub use map::Map;
30
31
mod map_while;
32
pub use map_while::MapWhile;
33
34
mod merge;
35
pub use merge::Merge;
36
37
mod next;
38
use next::Next;
39
40
mod skip;
41
pub use skip::Skip;
42
43
mod skip_while;
44
pub use skip_while::SkipWhile;
45
46
mod take;
47
pub use take::Take;
48
49
mod take_while;
50
pub use take_while::TakeWhile;
51
52
mod then;
53
pub use then::Then;
54
55
mod try_next;
56
use try_next::TryNext;
57
58
mod peekable;
59
pub use peekable::Peekable;
60
61
cfg_time! {
62
    pub(crate) mod timeout;
63
    pub(crate) mod timeout_repeating;
64
    pub use timeout::Timeout;
65
    pub use timeout_repeating::TimeoutRepeating;
66
    use tokio::time::{Duration, Interval};
67
    mod throttle;
68
    use throttle::{throttle, Throttle};
69
    mod chunks_timeout;
70
    pub use chunks_timeout::ChunksTimeout;
71
}
72
73
/// An extension trait for the [`Stream`] trait that provides a variety of
74
/// convenient combinator functions.
75
///
76
/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77
/// in the [futures] crate, however both Tokio and futures provide separate
78
/// `StreamExt` utility traits, and some utilities are only available on one of
79
/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80
/// trait in the futures crate.
81
///
82
/// If you need utilities from both `StreamExt` traits, you should prefer to
83
/// import one of them, and use the other through the fully qualified call
84
/// syntax. For example:
85
/// ```
86
/// // import one of the traits:
87
/// use futures::stream::StreamExt;
88
/// # #[tokio::main(flavor = "current_thread")]
89
/// # async fn main() {
90
///
91
/// let a = tokio_stream::iter(vec![1, 3, 5]);
92
/// let b = tokio_stream::iter(vec![2, 4, 6]);
93
///
94
/// // use the fully qualified call syntax for the other trait:
95
/// let merged = tokio_stream::StreamExt::merge(a, b);
96
///
97
/// // use normal call notation for futures::stream::StreamExt::collect
98
/// let output: Vec<_> = merged.collect().await;
99
/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100
/// # }
101
/// ```
102
///
103
/// [`Stream`]: crate::Stream
104
/// [futures]: https://docs.rs/futures
105
/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106
pub trait StreamExt: Stream {
107
    /// Consumes and returns the next value in the stream or `None` if the
108
    /// stream is finished.
109
    ///
110
    /// Equivalent to:
111
    ///
112
    /// ```ignore
113
    /// async fn next(&mut self) -> Option<Self::Item>;
114
    /// ```
115
    ///
116
    /// Note that because `next` doesn't take ownership over the stream,
117
    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118
    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119
    /// be done by boxing the stream using [`Box::pin`] or
120
    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121
    /// crate.
122
    ///
123
    /// # Cancel safety
124
    ///
125
    /// This method is cancel safe. The returned future only
126
    /// holds onto a reference to the underlying stream,
127
    /// so dropping it will never lose a value.
128
    ///
129
    /// # Examples
130
    ///
131
    /// ```
132
    /// # #[tokio::main(flavor = "current_thread")]
133
    /// # async fn main() {
134
    /// use tokio_stream::{self as stream, StreamExt};
135
    ///
136
    /// let mut stream = stream::iter(1..=3);
137
    ///
138
    /// assert_eq!(stream.next().await, Some(1));
139
    /// assert_eq!(stream.next().await, Some(2));
140
    /// assert_eq!(stream.next().await, Some(3));
141
    /// assert_eq!(stream.next().await, None);
142
    /// # }
143
    /// ```
144
0
    fn next(&mut self) -> Next<'_, Self>
145
0
    where
146
0
        Self: Unpin,
147
    {
148
0
        Next::new(self)
149
0
    }
150
151
    /// Consumes and returns the next item in the stream. If an error is
152
    /// encountered before the next item, the error is returned instead.
153
    ///
154
    /// Equivalent to:
155
    ///
156
    /// ```ignore
157
    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158
    /// ```
159
    ///
160
    /// This is similar to the [`next`](StreamExt::next) combinator,
161
    /// but returns a [`Result<Option<T>, E>`](Result) rather than
162
    /// an [`Option<Result<T, E>>`](Option), making for easy use
163
    /// with the [`?`](std::ops::Try) operator.
164
    ///
165
    /// # Cancel safety
166
    ///
167
    /// This method is cancel safe. The returned future only
168
    /// holds onto a reference to the underlying stream,
169
    /// so dropping it will never lose a value.
170
    ///
171
    /// # Examples
172
    ///
173
    /// ```
174
    /// # #[tokio::main(flavor = "current_thread")]
175
    /// # async fn main() {
176
    ///
177
    /// use tokio_stream::{self as stream, StreamExt};
178
    ///
179
    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
180
    ///
181
    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
182
    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
183
    /// assert_eq!(stream.try_next().await, Err("nope"));
184
    /// # }
185
    /// ```
186
0
    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
187
0
    where
188
0
        Self: Stream<Item = Result<T, E>> + Unpin,
189
    {
190
0
        TryNext::new(self)
191
0
    }
192
193
    /// Maps this stream's items to a different type, returning a new stream of
194
    /// the resulting type.
195
    ///
196
    /// The provided closure is executed over all elements of this stream as
197
    /// they are made available. It is executed inline with calls to
198
    /// [`poll_next`](Stream::poll_next).
199
    ///
200
    /// Note that this function consumes the stream passed into it and returns a
201
    /// wrapped version of it, similar to the existing `map` methods in the
202
    /// standard library.
203
    ///
204
    /// # Examples
205
    ///
206
    /// ```
207
    /// # #[tokio::main(flavor = "current_thread")]
208
    /// # async fn main() {
209
    /// use tokio_stream::{self as stream, StreamExt};
210
    ///
211
    /// let stream = stream::iter(1..=3);
212
    /// let mut stream = stream.map(|x| x + 3);
213
    ///
214
    /// assert_eq!(stream.next().await, Some(4));
215
    /// assert_eq!(stream.next().await, Some(5));
216
    /// assert_eq!(stream.next().await, Some(6));
217
    /// # }
218
    /// ```
219
0
    fn map<T, F>(self, f: F) -> Map<Self, F>
220
0
    where
221
0
        F: FnMut(Self::Item) -> T,
222
0
        Self: Sized,
223
    {
224
0
        Map::new(self, f)
225
0
    }
226
227
    /// Map this stream's items to a different type for as long as determined by
228
    /// the provided closure. A stream of the target type will be returned,
229
    /// which will yield elements until the closure returns `None`.
230
    ///
231
    /// The provided closure is executed over all elements of this stream as
232
    /// they are made available, until it returns `None`. It is executed inline
233
    /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
234
    /// the underlying stream will not be polled again.
235
    ///
236
    /// Note that this function consumes the stream passed into it and returns a
237
    /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
238
    /// standard library.
239
    ///
240
    /// # Examples
241
    ///
242
    /// ```
243
    /// # #[tokio::main(flavor = "current_thread")]
244
    /// # async fn main() {
245
    /// use tokio_stream::{self as stream, StreamExt};
246
    ///
247
    /// let stream = stream::iter(1..=10);
248
    /// let mut stream = stream.map_while(|x| {
249
    ///     if x < 4 {
250
    ///         Some(x + 3)
251
    ///     } else {
252
    ///         None
253
    ///     }
254
    /// });
255
    /// assert_eq!(stream.next().await, Some(4));
256
    /// assert_eq!(stream.next().await, Some(5));
257
    /// assert_eq!(stream.next().await, Some(6));
258
    /// assert_eq!(stream.next().await, None);
259
    /// # }
260
    /// ```
261
0
    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
262
0
    where
263
0
        F: FnMut(Self::Item) -> Option<T>,
264
0
        Self: Sized,
265
    {
266
0
        MapWhile::new(self, f)
267
0
    }
268
269
    /// Maps this stream's items asynchronously to a different type, returning a
270
    /// new stream of the resulting type.
271
    ///
272
    /// The provided closure is executed over all elements of this stream as
273
    /// they are made available, and the returned future is executed. Only one
274
    /// future is executed at the time.
275
    ///
276
    /// Note that this function consumes the stream passed into it and returns a
277
    /// wrapped version of it, similar to the existing `then` methods in the
278
    /// standard library.
279
    ///
280
    /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
281
    /// returned by this method. To handle this, you can use `tokio::pin!` as in
282
    /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
283
    ///
284
    /// # Examples
285
    ///
286
    /// ```
287
    /// # #[tokio::main(flavor = "current_thread")]
288
    /// # async fn main() {
289
    /// use tokio_stream::{self as stream, StreamExt};
290
    ///
291
    /// async fn do_async_work(value: i32) -> i32 {
292
    ///     value + 3
293
    /// }
294
    ///
295
    /// let stream = stream::iter(1..=3);
296
    /// let stream = stream.then(do_async_work);
297
    ///
298
    /// tokio::pin!(stream);
299
    ///
300
    /// assert_eq!(stream.next().await, Some(4));
301
    /// assert_eq!(stream.next().await, Some(5));
302
    /// assert_eq!(stream.next().await, Some(6));
303
    /// # }
304
    /// ```
305
0
    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
306
0
    where
307
0
        F: FnMut(Self::Item) -> Fut,
308
0
        Fut: Future,
309
0
        Self: Sized,
310
    {
311
0
        Then::new(self, f)
312
0
    }
313
314
    /// Combine two streams into one by interleaving the output of both as it
315
    /// is produced.
316
    ///
317
    /// Values are produced from the merged stream in the order they arrive from
318
    /// the two source streams. If both source streams provide values
319
    /// simultaneously, the merge stream alternates between them. This provides
320
    /// some level of fairness. You should not chain calls to `merge`, as this
321
    /// will break the fairness of the merging.
322
    ///
323
    /// The merged stream completes once **both** source streams complete. When
324
    /// one source stream completes before the other, the merge stream
325
    /// exclusively polls the remaining stream.
326
    ///
327
    /// For merging multiple streams, consider using [`StreamMap`] instead.
328
    ///
329
    /// [`StreamMap`]: crate::StreamMap
330
    ///
331
    /// # Examples
332
    ///
333
    /// ```
334
    /// use tokio_stream::{StreamExt, Stream};
335
    /// use tokio::sync::mpsc;
336
    /// use tokio::time;
337
    ///
338
    /// use std::time::Duration;
339
    /// use std::pin::Pin;
340
    ///
341
    /// # /*
342
    /// #[tokio::main]
343
    /// # */
344
    /// # #[tokio::main(flavor = "current_thread")]
345
    /// async fn main() {
346
    /// # time::pause();
347
    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
348
    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
349
    ///
350
    ///     // Convert the channels to a `Stream`.
351
    ///     let rx1 = Box::pin(async_stream::stream! {
352
    ///           while let Some(item) = rx1.recv().await {
353
    ///               yield item;
354
    ///           }
355
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356
    ///
357
    ///     let rx2 = Box::pin(async_stream::stream! {
358
    ///           while let Some(item) = rx2.recv().await {
359
    ///               yield item;
360
    ///           }
361
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
362
    ///
363
    ///     let mut rx = rx1.merge(rx2);
364
    ///
365
    ///     tokio::spawn(async move {
366
    ///         // Send some values immediately
367
    ///         tx1.send(1).await.unwrap();
368
    ///         tx1.send(2).await.unwrap();
369
    ///
370
    ///         // Let the other task send values
371
    ///         time::sleep(Duration::from_millis(20)).await;
372
    ///
373
    ///         tx1.send(4).await.unwrap();
374
    ///     });
375
    ///
376
    ///     tokio::spawn(async move {
377
    ///         // Wait for the first task to send values
378
    ///         time::sleep(Duration::from_millis(5)).await;
379
    ///
380
    ///         tx2.send(3).await.unwrap();
381
    ///
382
    ///         time::sleep(Duration::from_millis(25)).await;
383
    ///
384
    ///         // Send the final value
385
    ///         tx2.send(5).await.unwrap();
386
    ///     });
387
    ///
388
    ///    assert_eq!(1, rx.next().await.unwrap());
389
    ///    assert_eq!(2, rx.next().await.unwrap());
390
    ///    assert_eq!(3, rx.next().await.unwrap());
391
    ///    assert_eq!(4, rx.next().await.unwrap());
392
    ///    assert_eq!(5, rx.next().await.unwrap());
393
    ///
394
    ///    // The merged stream is consumed
395
    ///    assert!(rx.next().await.is_none());
396
    /// }
397
    /// ```
398
0
    fn merge<U>(self, other: U) -> Merge<Self, U>
399
0
    where
400
0
        U: Stream<Item = Self::Item>,
401
0
        Self: Sized,
402
    {
403
0
        Merge::new(self, other)
404
0
    }
405
406
    /// Filters the values produced by this stream according to the provided
407
    /// predicate.
408
    ///
409
    /// As values of this stream are made available, the provided predicate `f`
410
    /// will be run against them. If the predicate
411
    /// resolves to `true`, then the stream will yield the value, but if the
412
    /// predicate resolves to `false`, then the value
413
    /// will be discarded and the next value will be produced.
414
    ///
415
    /// Note that this function consumes the stream passed into it and returns a
416
    /// wrapped version of it, similar to [`Iterator::filter`] method in the
417
    /// standard library.
418
    ///
419
    /// # Examples
420
    ///
421
    /// ```
422
    /// # #[tokio::main(flavor = "current_thread")]
423
    /// # async fn main() {
424
    /// use tokio_stream::{self as stream, StreamExt};
425
    ///
426
    /// let stream = stream::iter(1..=8);
427
    /// let mut evens = stream.filter(|x| x % 2 == 0);
428
    ///
429
    /// assert_eq!(Some(2), evens.next().await);
430
    /// assert_eq!(Some(4), evens.next().await);
431
    /// assert_eq!(Some(6), evens.next().await);
432
    /// assert_eq!(Some(8), evens.next().await);
433
    /// assert_eq!(None, evens.next().await);
434
    /// # }
435
    /// ```
436
0
    fn filter<F>(self, f: F) -> Filter<Self, F>
437
0
    where
438
0
        F: FnMut(&Self::Item) -> bool,
439
0
        Self: Sized,
440
    {
441
0
        Filter::new(self, f)
442
0
    }
443
444
    /// Filters the values produced by this stream while simultaneously mapping
445
    /// them to a different type according to the provided closure.
446
    ///
447
    /// As values of this stream are made available, the provided function will
448
    /// be run on them. If the predicate `f` resolves to
449
    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
450
    /// it resolves to [`None`], then the value will be skipped.
451
    ///
452
    /// Note that this function consumes the stream passed into it and returns a
453
    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
454
    /// standard library.
455
    ///
456
    /// # Examples
457
    /// ```
458
    /// # #[tokio::main(flavor = "current_thread")]
459
    /// # async fn main() {
460
    /// use tokio_stream::{self as stream, StreamExt};
461
    ///
462
    /// let stream = stream::iter(1..=8);
463
    /// let mut evens = stream.filter_map(|x| {
464
    ///     if x % 2 == 0 { Some(x + 1) } else { None }
465
    /// });
466
    ///
467
    /// assert_eq!(Some(3), evens.next().await);
468
    /// assert_eq!(Some(5), evens.next().await);
469
    /// assert_eq!(Some(7), evens.next().await);
470
    /// assert_eq!(Some(9), evens.next().await);
471
    /// assert_eq!(None, evens.next().await);
472
    /// # }
473
    /// ```
474
0
    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
475
0
    where
476
0
        F: FnMut(Self::Item) -> Option<T>,
477
0
        Self: Sized,
478
    {
479
0
        FilterMap::new(self, f)
480
0
    }
481
482
    /// Creates a stream which ends after the first `None`.
483
    ///
484
    /// After a stream returns `None`, behavior is undefined. Future calls to
485
    /// `poll_next` may or may not return `Some(T)` again or they may panic.
486
    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
487
    /// return `None` forever.
488
    ///
489
    /// # Examples
490
    ///
491
    /// ```
492
    /// use tokio_stream::{Stream, StreamExt};
493
    ///
494
    /// use std::pin::Pin;
495
    /// use std::task::{Context, Poll};
496
    ///
497
    /// // a stream which alternates between Some and None
498
    /// struct Alternate {
499
    ///     state: i32,
500
    /// }
501
    ///
502
    /// impl Stream for Alternate {
503
    ///     type Item = i32;
504
    ///
505
    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
506
    ///         let val = self.state;
507
    ///         self.state = self.state + 1;
508
    ///
509
    ///         // if it's even, Some(i32), else None
510
    ///         if val % 2 == 0 {
511
    ///             Poll::Ready(Some(val))
512
    ///         } else {
513
    ///             Poll::Ready(None)
514
    ///         }
515
    ///     }
516
    /// }
517
    ///
518
    /// # /*
519
    /// #[tokio::main]
520
    /// # */
521
    /// # #[tokio::main(flavor = "current_thread")]
522
    /// async fn main() {
523
    ///     let mut stream = Alternate { state: 0 };
524
    ///
525
    ///     // the stream goes back and forth
526
    ///     assert_eq!(stream.next().await, Some(0));
527
    ///     assert_eq!(stream.next().await, None);
528
    ///     assert_eq!(stream.next().await, Some(2));
529
    ///     assert_eq!(stream.next().await, None);
530
    ///
531
    ///     // however, once it is fused
532
    ///     let mut stream = stream.fuse();
533
    ///
534
    ///     assert_eq!(stream.next().await, Some(4));
535
    ///     assert_eq!(stream.next().await, None);
536
    ///
537
    ///     // it will always return `None` after the first time.
538
    ///     assert_eq!(stream.next().await, None);
539
    ///     assert_eq!(stream.next().await, None);
540
    ///     assert_eq!(stream.next().await, None);
541
    /// }
542
    /// ```
543
0
    fn fuse(self) -> Fuse<Self>
544
0
    where
545
0
        Self: Sized,
546
    {
547
0
        Fuse::new(self)
548
0
    }
549
550
    /// Creates a new stream of at most `n` items of the underlying stream.
551
    ///
552
    /// Once `n` items have been yielded from this stream then it will always
553
    /// return that the stream is done.
554
    ///
555
    /// # Examples
556
    ///
557
    /// ```
558
    /// # #[tokio::main(flavor = "current_thread")]
559
    /// # async fn main() {
560
    /// use tokio_stream::{self as stream, StreamExt};
561
    ///
562
    /// let mut stream = stream::iter(1..=10).take(3);
563
    ///
564
    /// assert_eq!(Some(1), stream.next().await);
565
    /// assert_eq!(Some(2), stream.next().await);
566
    /// assert_eq!(Some(3), stream.next().await);
567
    /// assert_eq!(None, stream.next().await);
568
    /// # }
569
    /// ```
570
0
    fn take(self, n: usize) -> Take<Self>
571
0
    where
572
0
        Self: Sized,
573
    {
574
0
        Take::new(self, n)
575
0
    }
576
577
    /// Take elements from this stream while the provided predicate
578
    /// resolves to `true`.
579
    ///
580
    /// This function, like `Iterator::take_while`, will take elements from the
581
    /// stream until the predicate `f` resolves to `false`. Once one element
582
    /// returns false it will always return that the stream is done.
583
    ///
584
    /// # Examples
585
    ///
586
    /// ```
587
    /// # #[tokio::main(flavor = "current_thread")]
588
    /// # async fn main() {
589
    /// use tokio_stream::{self as stream, StreamExt};
590
    ///
591
    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
592
    ///
593
    /// assert_eq!(Some(1), stream.next().await);
594
    /// assert_eq!(Some(2), stream.next().await);
595
    /// assert_eq!(Some(3), stream.next().await);
596
    /// assert_eq!(None, stream.next().await);
597
    /// # }
598
    /// ```
599
0
    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
600
0
    where
601
0
        F: FnMut(&Self::Item) -> bool,
602
0
        Self: Sized,
603
    {
604
0
        TakeWhile::new(self, f)
605
0
    }
606
607
    /// Creates a new stream that will skip the `n` first items of the
608
    /// underlying stream.
609
    ///
610
    /// # Examples
611
    ///
612
    /// ```
613
    /// # #[tokio::main(flavor = "current_thread")]
614
    /// # async fn main() {
615
    /// use tokio_stream::{self as stream, StreamExt};
616
    ///
617
    /// let mut stream = stream::iter(1..=10).skip(7);
618
    ///
619
    /// assert_eq!(Some(8), stream.next().await);
620
    /// assert_eq!(Some(9), stream.next().await);
621
    /// assert_eq!(Some(10), stream.next().await);
622
    /// assert_eq!(None, stream.next().await);
623
    /// # }
624
    /// ```
625
0
    fn skip(self, n: usize) -> Skip<Self>
626
0
    where
627
0
        Self: Sized,
628
    {
629
0
        Skip::new(self, n)
630
0
    }
631
632
    /// Skip elements from the underlying stream while the provided predicate
633
    /// resolves to `true`.
634
    ///
635
    /// This function, like [`Iterator::skip_while`], will ignore elements from the
636
    /// stream until the predicate `f` resolves to `false`. Once one element
637
    /// returns false, the rest of the elements will be yielded.
638
    ///
639
    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
640
    ///
641
    /// # Examples
642
    ///
643
    /// ```
644
    /// # #[tokio::main(flavor = "current_thread")]
645
    /// # async fn main() {
646
    /// use tokio_stream::{self as stream, StreamExt};
647
    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
648
    ///
649
    /// assert_eq!(Some(3), stream.next().await);
650
    /// assert_eq!(Some(4), stream.next().await);
651
    /// assert_eq!(Some(1), stream.next().await);
652
    /// assert_eq!(None, stream.next().await);
653
    /// # }
654
    /// ```
655
0
    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
656
0
    where
657
0
        F: FnMut(&Self::Item) -> bool,
658
0
        Self: Sized,
659
    {
660
0
        SkipWhile::new(self, f)
661
0
    }
662
663
    /// Tests if every element of the stream matches a predicate.
664
    ///
665
    /// Equivalent to:
666
    ///
667
    /// ```ignore
668
    /// async fn all<F>(&mut self, f: F) -> bool;
669
    /// ```
670
    ///
671
    /// `all()` takes a closure that returns `true` or `false`. It applies
672
    /// this closure to each element of the stream, and if they all return
673
    /// `true`, then so does `all`. If any of them return `false`, it
674
    /// returns `false`. An empty stream returns `true`.
675
    ///
676
    /// `all()` is short-circuiting; in other words, it will stop processing
677
    /// as soon as it finds a `false`, given that no matter what else happens,
678
    /// the result will also be `false`.
679
    ///
680
    /// An empty stream returns `true`.
681
    ///
682
    /// # Examples
683
    ///
684
    /// Basic usage:
685
    ///
686
    /// ```
687
    /// # #[tokio::main(flavor = "current_thread")]
688
    /// # async fn main() {
689
    /// use tokio_stream::{self as stream, StreamExt};
690
    ///
691
    /// let a = [1, 2, 3];
692
    ///
693
    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
694
    ///
695
    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
696
    /// # }
697
    /// ```
698
    ///
699
    /// Stopping at the first `false`:
700
    ///
701
    /// ```
702
    /// # #[tokio::main(flavor = "current_thread")]
703
    /// # async fn main() {
704
    /// use tokio_stream::{self as stream, StreamExt};
705
    ///
706
    /// let a = [1, 2, 3];
707
    ///
708
    /// let mut iter = stream::iter(&a);
709
    ///
710
    /// assert!(!iter.all(|&x| x != 2).await);
711
    ///
712
    /// // we can still use `iter`, as there are more elements.
713
    /// assert_eq!(iter.next().await, Some(&3));
714
    /// # }
715
    /// ```
716
0
    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
717
0
    where
718
0
        Self: Unpin,
719
0
        F: FnMut(Self::Item) -> bool,
720
    {
721
0
        AllFuture::new(self, f)
722
0
    }
723
724
    /// Tests if any element of the stream matches a predicate.
725
    ///
726
    /// Equivalent to:
727
    ///
728
    /// ```ignore
729
    /// async fn any<F>(&mut self, f: F) -> bool;
730
    /// ```
731
    ///
732
    /// `any()` takes a closure that returns `true` or `false`. It applies
733
    /// this closure to each element of the stream, and if any of them return
734
    /// `true`, then so does `any()`. If they all return `false`, it
735
    /// returns `false`.
736
    ///
737
    /// `any()` is short-circuiting; in other words, it will stop processing
738
    /// as soon as it finds a `true`, given that no matter what else happens,
739
    /// the result will also be `true`.
740
    ///
741
    /// An empty stream returns `false`.
742
    ///
743
    /// Basic usage:
744
    ///
745
    /// ```
746
    /// # #[tokio::main(flavor = "current_thread")]
747
    /// # async fn main() {
748
    /// use tokio_stream::{self as stream, StreamExt};
749
    ///
750
    /// let a = [1, 2, 3];
751
    ///
752
    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
753
    ///
754
    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
755
    /// # }
756
    /// ```
757
    ///
758
    /// Stopping at the first `true`:
759
    ///
760
    /// ```
761
    /// # #[tokio::main(flavor = "current_thread")]
762
    /// # async fn main() {
763
    /// use tokio_stream::{self as stream, StreamExt};
764
    ///
765
    /// let a = [1, 2, 3];
766
    ///
767
    /// let mut iter = stream::iter(&a);
768
    ///
769
    /// assert!(iter.any(|&x| x != 2).await);
770
    ///
771
    /// // we can still use `iter`, as there are more elements.
772
    /// assert_eq!(iter.next().await, Some(&2));
773
    /// # }
774
    /// ```
775
0
    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
776
0
    where
777
0
        Self: Unpin,
778
0
        F: FnMut(Self::Item) -> bool,
779
    {
780
0
        AnyFuture::new(self, f)
781
0
    }
782
783
    /// Combine two streams into one by first returning all values from the
784
    /// first stream then all values from the second stream.
785
    ///
786
    /// As long as `self` still has values to emit, no values from `other` are
787
    /// emitted, even if some are ready.
788
    ///
789
    /// # Examples
790
    ///
791
    /// ```
792
    /// use tokio_stream::{self as stream, StreamExt};
793
    ///
794
    /// # #[tokio::main(flavor = "current_thread")]
795
    /// # async fn main() {
796
    /// let one = stream::iter(vec![1, 2, 3]);
797
    /// let two = stream::iter(vec![4, 5, 6]);
798
    ///
799
    /// let mut stream = one.chain(two);
800
    ///
801
    /// assert_eq!(stream.next().await, Some(1));
802
    /// assert_eq!(stream.next().await, Some(2));
803
    /// assert_eq!(stream.next().await, Some(3));
804
    /// assert_eq!(stream.next().await, Some(4));
805
    /// assert_eq!(stream.next().await, Some(5));
806
    /// assert_eq!(stream.next().await, Some(6));
807
    /// assert_eq!(stream.next().await, None);
808
    /// # }
809
    /// ```
810
0
    fn chain<U>(self, other: U) -> Chain<Self, U>
811
0
    where
812
0
        U: Stream<Item = Self::Item>,
813
0
        Self: Sized,
814
    {
815
0
        Chain::new(self, other)
816
0
    }
817
818
    /// A combinator that applies a function to every element in a stream
819
    /// producing a single, final value.
820
    ///
821
    /// Equivalent to:
822
    ///
823
    /// ```ignore
824
    /// async fn fold<B, F>(self, init: B, f: F) -> B;
825
    /// ```
826
    ///
827
    /// # Examples
828
    /// Basic usage:
829
    /// ```
830
    /// # #[tokio::main(flavor = "current_thread")]
831
    /// # async fn main() {
832
    /// use tokio_stream::{self as stream, *};
833
    ///
834
    /// let s = stream::iter(vec![1u8, 2, 3]);
835
    /// let sum = s.fold(0, |acc, x| acc + x).await;
836
    ///
837
    /// assert_eq!(sum, 6);
838
    /// # }
839
    /// ```
840
0
    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
841
0
    where
842
0
        Self: Sized,
843
0
        F: FnMut(B, Self::Item) -> B,
844
    {
845
0
        FoldFuture::new(self, init, f)
846
0
    }
847
848
    /// Drain stream pushing all emitted values into a collection.
849
    ///
850
    /// Equivalent to:
851
    ///
852
    /// ```ignore
853
    /// async fn collect<T>(self) -> T;
854
    /// ```
855
    ///
856
    /// `collect` streams all values, awaiting as needed. Values are pushed into
857
    /// a collection. A number of different target collection types are
858
    /// supported, including [`Vec`], [`String`], and [`Bytes`].
859
    ///
860
    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
861
    ///
862
    /// # `Result`
863
    ///
864
    /// `collect()` can also be used with streams of type `Result<T, E>` where
865
    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
866
    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
867
    /// streaming is terminated and `collect()` returns the `Err`.
868
    ///
869
    /// # Notes
870
    ///
871
    /// `FromStream` is currently a sealed trait. Stabilization is pending
872
    /// enhancements to the Rust language.
873
    ///
874
    /// # Examples
875
    ///
876
    /// Basic usage:
877
    ///
878
    /// ```
879
    /// use tokio_stream::{self as stream, StreamExt};
880
    ///
881
    /// # #[tokio::main(flavor = "current_thread")]
882
    /// # async fn main() {
883
    /// let doubled: Vec<i32> =
884
    ///     stream::iter(vec![1, 2, 3])
885
    ///         .map(|x| x * 2)
886
    ///         .collect()
887
    ///         .await;
888
    ///
889
    /// assert_eq!(vec![2, 4, 6], doubled);
890
    /// # }
891
    /// ```
892
    ///
893
    /// Collecting a stream of `Result` values
894
    ///
895
    /// ```
896
    /// use tokio_stream::{self as stream, StreamExt};
897
    ///
898
    /// # #[tokio::main(flavor = "current_thread")]
899
    /// # async fn main() {
900
    /// // A stream containing only `Ok` values will be collected
901
    /// let values: Result<Vec<i32>, &str> =
902
    ///     stream::iter(vec![Ok(1), Ok(2), Ok(3)])
903
    ///         .collect()
904
    ///         .await;
905
    ///
906
    /// assert_eq!(Ok(vec![1, 2, 3]), values);
907
    ///
908
    /// // A stream containing `Err` values will return the first error.
909
    /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
910
    ///
911
    /// let values: Result<Vec<i32>, &str> =
912
    ///     stream::iter(results)
913
    ///         .collect()
914
    ///         .await;
915
    ///
916
    /// assert_eq!(Err("no"), values);
917
    /// # }
918
    /// ```
919
0
    fn collect<T>(self) -> Collect<Self, T, T::InternalCollection>
920
0
    where
921
0
        T: FromStream<Self::Item>,
922
0
        Self: Sized,
923
    {
924
0
        Collect::new(self)
925
0
    }
926
927
    /// Applies a per-item timeout to the passed stream.
928
    ///
929
    /// `timeout()` takes a `Duration` that represents the maximum amount of
930
    /// time each element of the stream has to complete before timing out.
931
    ///
932
    /// If the wrapped stream yields a value before the deadline is reached, the
933
    /// value is returned. Otherwise, an error is returned. The caller may decide
934
    /// to continue consuming the stream and will eventually get the next source
935
    /// stream value once it becomes available. See
936
    /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
937
    /// where the timeouts will repeat.
938
    ///
939
    /// # Notes
940
    ///
941
    /// This function consumes the stream passed into it and returns a
942
    /// wrapped version of it.
943
    ///
944
    /// Polling the returned stream will continue to poll the inner stream even
945
    /// if one or more items time out.
946
    ///
947
    /// # Examples
948
    ///
949
    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
950
    ///
951
    /// ```
952
    /// # #[tokio::main(flavor = "current_thread")]
953
    /// # async fn main() {
954
    /// use tokio_stream::{self as stream, StreamExt};
955
    /// use std::time::Duration;
956
    /// # let int_stream = stream::iter(1..=3);
957
    ///
958
    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
959
    /// tokio::pin!(int_stream);
960
    ///
961
    /// // When no items time out, we get the 3 elements in succession:
962
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
963
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
964
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
965
    /// assert_eq!(int_stream.try_next().await, Ok(None));
966
    ///
967
    /// // If the second item times out, we get an error and continue polling the stream:
968
    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
969
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
970
    /// assert!(int_stream.try_next().await.is_err());
971
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
972
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
973
    /// assert_eq!(int_stream.try_next().await, Ok(None));
974
    ///
975
    /// // If we want to stop consuming the source stream the first time an
976
    /// // element times out, we can use the `take_while` operator:
977
    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
978
    /// let mut int_stream = int_stream.take_while(Result::is_ok);
979
    ///
980
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
981
    /// assert_eq!(int_stream.try_next().await, Ok(None));
982
    /// # }
983
    /// ```
984
    ///
985
    /// Once a timeout error is received, no further events will be received
986
    /// unless the wrapped stream yields a value (timeouts do not repeat).
987
    ///
988
    /// ```
989
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
990
    /// # async fn main() {
991
    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
992
    /// use std::time::Duration;
993
    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
994
    /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
995
    /// tokio::pin!(timeout_stream);
996
    ///
997
    /// // Only one timeout will be received between values in the source stream.
998
    /// assert!(timeout_stream.try_next().await.is_ok());
999
    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1000
    /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
1001
    /// # }
1002
    /// ```
1003
    #[cfg(feature = "time")]
1004
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1005
0
    fn timeout(self, duration: Duration) -> Timeout<Self>
1006
0
    where
1007
0
        Self: Sized,
1008
    {
1009
0
        Timeout::new(self, duration)
1010
0
    }
1011
1012
    /// Applies a per-item timeout to the passed stream.
1013
    ///
1014
    /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1015
    /// element of the stream has to complete before timing out.
1016
    ///
1017
    /// If the wrapped stream yields a value before the deadline is reached, the
1018
    /// value is returned. Otherwise, an error is returned. The caller may decide
1019
    /// to continue consuming the stream and will eventually get the next source
1020
    /// stream value once it becomes available. Unlike `timeout()`, if no value
1021
    /// becomes available before the deadline is reached, additional errors are
1022
    /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1023
    /// for an alternative where the timeouts do not repeat.
1024
    ///
1025
    /// # Notes
1026
    ///
1027
    /// This function consumes the stream passed into it and returns a
1028
    /// wrapped version of it.
1029
    ///
1030
    /// Polling the returned stream will continue to poll the inner stream even
1031
    /// if one or more items time out.
1032
    ///
1033
    /// # Examples
1034
    ///
1035
    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1036
    ///
1037
    /// ```
1038
    /// # #[tokio::main(flavor = "current_thread")]
1039
    /// # async fn main() {
1040
    /// use tokio_stream::{self as stream, StreamExt};
1041
    /// use std::time::Duration;
1042
    /// # let int_stream = stream::iter(1..=3);
1043
    ///
1044
    /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1045
    /// tokio::pin!(int_stream);
1046
    ///
1047
    /// // When no items time out, we get the 3 elements in succession:
1048
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1049
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1050
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1051
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1052
    ///
1053
    /// // If the second item times out, we get an error and continue polling the stream:
1054
    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1055
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1056
    /// assert!(int_stream.try_next().await.is_err());
1057
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1058
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1059
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1060
    ///
1061
    /// // If we want to stop consuming the source stream the first time an
1062
    /// // element times out, we can use the `take_while` operator:
1063
    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1064
    /// let mut int_stream = int_stream.take_while(Result::is_ok);
1065
    ///
1066
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1067
    /// assert_eq!(int_stream.try_next().await, Ok(None));
1068
    /// # }
1069
    /// ```
1070
    ///
1071
    /// Timeout errors will be continuously produced at the specified interval
1072
    /// until the wrapped stream yields a value.
1073
    ///
1074
    /// ```
1075
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1076
    /// # async fn main() {
1077
    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1078
    /// use std::time::Duration;
1079
    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1080
    /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1081
    /// tokio::pin!(timeout_stream);
1082
    ///
1083
    /// // Multiple timeouts will be received between values in the source stream.
1084
    /// assert!(timeout_stream.try_next().await.is_ok());
1085
    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1086
    /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1087
    /// // Will eventually receive another value from the source stream...
1088
    /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1089
    /// # }
1090
    /// ```
1091
    #[cfg(feature = "time")]
1092
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1093
0
    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1094
0
    where
1095
0
        Self: Sized,
1096
    {
1097
0
        TimeoutRepeating::new(self, interval)
1098
0
    }
1099
1100
    /// Slows down a stream by enforcing a delay between items.
1101
    ///
1102
    /// The underlying timer behind this utility has a granularity of one millisecond.
1103
    ///
1104
    /// # Example
1105
    ///
1106
    /// Create a throttled stream.
1107
    /// ```rust,no_run
1108
    /// use std::time::Duration;
1109
    /// use tokio_stream::StreamExt;
1110
    ///
1111
    /// # async fn dox() {
1112
    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1113
    /// tokio::pin!(item_stream);
1114
    ///
1115
    /// loop {
1116
    ///     // The string will be produced at most every 2 seconds
1117
    ///     println!("{:?}", item_stream.next().await);
1118
    /// }
1119
    /// # }
1120
    /// ```
1121
    #[cfg(feature = "time")]
1122
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1123
0
    fn throttle(self, duration: Duration) -> Throttle<Self>
1124
0
    where
1125
0
        Self: Sized,
1126
    {
1127
0
        throttle(duration, self)
1128
0
    }
1129
1130
    /// Batches the items in the given stream using a maximum duration and size for each batch.
1131
    ///
1132
    /// This stream returns the next batch of items in the following situations:
1133
    ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1134
    ///  2. The time since the first item of a batch is greater than the given duration.
1135
    ///  3. The end of the stream is reached.
1136
    ///
1137
    /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1138
    /// will not be emitted if no items are received upstream.
1139
    ///
1140
    /// # Panics
1141
    ///
1142
    /// This function panics if `max_size` is zero
1143
    ///
1144
    /// # Example
1145
    ///
1146
    /// ```rust
1147
    /// use std::time::Duration;
1148
    /// use tokio::time;
1149
    /// use tokio_stream::{self as stream, StreamExt};
1150
    /// use futures::FutureExt;
1151
    ///
1152
    /// #[tokio::main]
1153
    /// # async fn _unused() {}
1154
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1155
    /// async fn main() {
1156
    ///     let iter = vec![1, 2, 3, 4].into_iter();
1157
    ///     let stream0 = stream::iter(iter);
1158
    ///
1159
    ///     let iter = vec![5].into_iter();
1160
    ///     let stream1 = stream::iter(iter)
1161
    ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1162
    ///
1163
    ///     let chunk_stream = stream0
1164
    ///         .chain(stream1)
1165
    ///         .chunks_timeout(3, Duration::from_secs(2));
1166
    ///     tokio::pin!(chunk_stream);
1167
    ///
1168
    ///     // a full batch was received
1169
    ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1170
    ///     // deadline was reached before max_size was reached
1171
    ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1172
    ///     // last element in the stream
1173
    ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1174
    /// }
1175
    /// ```
1176
    #[cfg(feature = "time")]
1177
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1178
    #[track_caller]
1179
0
    fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1180
0
    where
1181
0
        Self: Sized,
1182
    {
1183
0
        assert!(max_size > 0, "`max_size` must be non-zero.");
1184
0
        ChunksTimeout::new(self, max_size, duration)
1185
0
    }
1186
1187
    /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1188
    /// consumed.
1189
    /// ```rust
1190
    /// use tokio_stream::{self as stream, StreamExt};
1191
    ///
1192
    /// #[tokio::main]
1193
    /// # async fn _unused() {}
1194
    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1195
    /// async fn main() {
1196
    ///     let iter = vec![1, 2, 3, 4].into_iter();
1197
    ///     let mut stream = stream::iter(iter).peekable();
1198
    ///
1199
    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1200
    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1201
    ///     assert_eq!(stream.next().await.unwrap(), 1);
1202
    ///     assert_eq!(*stream.peek().await.unwrap(), 2);
1203
    /// }
1204
    /// ```
1205
0
    fn peekable(self) -> Peekable<Self>
1206
0
    where
1207
0
        Self: Sized,
1208
    {
1209
0
        Peekable::new(self)
1210
0
    }
1211
}
1212
1213
impl<St: ?Sized> StreamExt for St where St: Stream {}
1214
1215
/// Merge the size hints from two streams.
1216
0
fn merge_size_hints(
1217
0
    (left_low, left_high): (usize, Option<usize>),
1218
0
    (right_low, right_high): (usize, Option<usize>),
1219
0
) -> (usize, Option<usize>) {
1220
0
    let low = left_low.saturating_add(right_low);
1221
0
    let high = match (left_high, right_high) {
1222
0
        (Some(h1), Some(h2)) => h1.checked_add(h2),
1223
0
        _ => None,
1224
    };
1225
0
    (low, high)
1226
0
}