Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.17/src/stream_map.rs
Line
Count
Source
1
use crate::Stream;
2
3
use std::borrow::Borrow;
4
use std::future::poll_fn;
5
use std::hash::Hash;
6
use std::pin::Pin;
7
use std::task::{ready, Context, Poll};
8
9
/// Combine many streams into one, indexing each source stream with a unique
10
/// key.
11
///
12
/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
13
/// streams into a single merged stream that yields values in the order that
14
/// they arrive from the source streams. However, `StreamMap` has a lot more
15
/// flexibility in usage patterns.
16
///
17
/// `StreamMap` can:
18
///
19
/// * Merge an arbitrary number of streams.
20
/// * Track which source stream the value was received from.
21
/// * Handle inserting and removing streams from the set of managed streams at
22
///   any point during iteration.
23
///
24
/// All source streams held by `StreamMap` are indexed using a key. This key is
25
/// included with the value when a source stream yields a value. The key is also
26
/// used to remove the stream from the `StreamMap` before the stream has
27
/// completed streaming.
28
///
29
/// # `Unpin`
30
///
31
/// Because the `StreamMap` API moves streams during runtime, both streams and
32
/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
33
/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
34
/// pin the stream in the heap.
35
///
36
/// # Implementation
37
///
38
/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
39
/// internal implementation detail will persist in future versions, but it is
40
/// important to know the runtime implications. In general, `StreamMap` works
41
/// best with a "smallish" number of streams as all entries are scanned on
42
/// insert, remove, and polling. In cases where a large number of streams need
43
/// to be merged, it may be advisable to use tasks sending values on a shared
44
/// [`mpsc`] channel.
45
///
46
/// # Notes
47
///
48
/// `StreamMap` removes finished streams automatically, without alerting the user.
49
/// In some scenarios, the caller would want to know on closed streams.
50
/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
51
/// It will return None when the stream is closed.
52
///
53
/// [`StreamExt::merge`]: crate::StreamExt::merge
54
/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
55
/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
56
/// [`Box::pin`]: std::boxed::Box::pin
57
/// [`StreamNotifyClose`]: crate::StreamNotifyClose
58
///
59
/// # Examples
60
///
61
/// Merging two streams, then remove them after receiving the first value
62
///
63
/// ```
64
/// use tokio_stream::{StreamExt, StreamMap, Stream};
65
/// use tokio::sync::mpsc;
66
/// use std::pin::Pin;
67
///
68
/// #[tokio::main]
69
/// async fn main() {
70
///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
71
///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
72
///
73
///     // Convert the channels to a `Stream`.
74
///     let rx1 = Box::pin(async_stream::stream! {
75
///           while let Some(item) = rx1.recv().await {
76
///               yield item;
77
///           }
78
///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
79
///
80
///     let rx2 = Box::pin(async_stream::stream! {
81
///           while let Some(item) = rx2.recv().await {
82
///               yield item;
83
///           }
84
///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
85
///
86
///     tokio::spawn(async move {
87
///         tx1.send(1).await.unwrap();
88
///
89
///         // This value will never be received. The send may or may not return
90
///         // `Err` depending on if the remote end closed first or not.
91
///         let _ = tx1.send(2).await;
92
///     });
93
///
94
///     tokio::spawn(async move {
95
///         tx2.send(3).await.unwrap();
96
///         let _ = tx2.send(4).await;
97
///     });
98
///
99
///     let mut map = StreamMap::new();
100
///
101
///     // Insert both streams
102
///     map.insert("one", rx1);
103
///     map.insert("two", rx2);
104
///
105
///     // Read twice
106
///     for _ in 0..2 {
107
///         let (key, val) = map.next().await.unwrap();
108
///
109
///         if key == "one" {
110
///             assert_eq!(val, 1);
111
///         } else {
112
///             assert_eq!(val, 3);
113
///         }
114
///
115
///         // Remove the stream to prevent reading the next value
116
///         map.remove(key);
117
///     }
118
/// }
119
/// ```
120
///
121
/// This example models a read-only client to a chat system with channels. The
122
/// client sends commands to join and leave channels. `StreamMap` is used to
123
/// manage active channel subscriptions.
124
///
125
/// For simplicity, messages are displayed with `println!`, but they could be
126
/// sent to the client over a socket.
127
///
128
/// ```no_run
129
/// use tokio_stream::{Stream, StreamExt, StreamMap};
130
///
131
/// enum Command {
132
///     Join(String),
133
///     Leave(String),
134
/// }
135
///
136
/// fn commands() -> impl Stream<Item = Command> {
137
///     // Streams in user commands by parsing `stdin`.
138
/// # tokio_stream::pending()
139
/// }
140
///
141
/// // Join a channel, returns a stream of messages received on the channel.
142
/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
143
///     // left as an exercise to the reader
144
/// # tokio_stream::pending()
145
/// }
146
///
147
/// #[tokio::main]
148
/// async fn main() {
149
///     let mut channels = StreamMap::new();
150
///
151
///     // Input commands (join / leave channels).
152
///     let cmds = commands();
153
///     tokio::pin!(cmds);
154
///
155
///     loop {
156
///         tokio::select! {
157
///             Some(cmd) = cmds.next() => {
158
///                 match cmd {
159
///                     Command::Join(chan) => {
160
///                         // Join the channel and add it to the `channels`
161
///                         // stream map
162
///                         let msgs = join(&chan);
163
///                         channels.insert(chan, msgs);
164
///                     }
165
///                     Command::Leave(chan) => {
166
///                         channels.remove(&chan);
167
///                     }
168
///                 }
169
///             }
170
///             Some((chan, msg)) = channels.next() => {
171
///                 // Received a message, display it on stdout with the channel
172
///                 // it originated from.
173
///                 println!("{}: {}", chan, msg);
174
///             }
175
///             // Both the `commands` stream and the `channels` stream are
176
///             // complete. There is no more work to do, so leave the loop.
177
///             else => break,
178
///         }
179
///     }
180
/// }
181
/// ```
182
///
183
/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
184
///
185
/// ```
186
/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
187
///
188
/// #[tokio::main]
189
/// async fn main() {
190
///     let mut map = StreamMap::new();
191
///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192
///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
193
///     map.insert(0, stream);
194
///     map.insert(1, stream2);
195
///     while let Some((key, val)) = map.next().await {
196
///         match val {
197
///             Some(val) => println!("got {val:?} from stream {key:?}"),
198
///             None => println!("stream {key:?} closed"),
199
///         }
200
///     }
201
/// }
202
/// ```
203
204
#[derive(Debug)]
205
pub struct StreamMap<K, V> {
206
    /// Streams stored in the map
207
    entries: Vec<(K, V)>,
208
}
209
210
impl<K, V> StreamMap<K, V> {
211
    /// An iterator visiting all key-value pairs in arbitrary order.
212
    ///
213
    /// The iterator element type is `&'a (K, V)`.
214
    ///
215
    /// # Examples
216
    ///
217
    /// ```
218
    /// use tokio_stream::{StreamMap, pending};
219
    ///
220
    /// let mut map = StreamMap::new();
221
    ///
222
    /// map.insert("a", pending::<i32>());
223
    /// map.insert("b", pending());
224
    /// map.insert("c", pending());
225
    ///
226
    /// for (key, stream) in map.iter() {
227
    ///     println!("({}, {:?})", key, stream);
228
    /// }
229
    /// ```
230
0
    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
231
0
        self.entries.iter()
232
0
    }
233
234
    /// An iterator visiting all key-value pairs mutably in arbitrary order.
235
    ///
236
    /// The iterator element type is `&'a mut (K, V)`.
237
    ///
238
    /// # Examples
239
    ///
240
    /// ```
241
    /// use tokio_stream::{StreamMap, pending};
242
    ///
243
    /// let mut map = StreamMap::new();
244
    ///
245
    /// map.insert("a", pending::<i32>());
246
    /// map.insert("b", pending());
247
    /// map.insert("c", pending());
248
    ///
249
    /// for (key, stream) in map.iter_mut() {
250
    ///     println!("({}, {:?})", key, stream);
251
    /// }
252
    /// ```
253
0
    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
254
0
        self.entries.iter_mut()
255
0
    }
256
257
    /// Creates an empty `StreamMap`.
258
    ///
259
    /// The stream map is initially created with a capacity of `0`, so it will
260
    /// not allocate until it is first inserted into.
261
    ///
262
    /// # Examples
263
    ///
264
    /// ```
265
    /// use tokio_stream::{StreamMap, Pending};
266
    ///
267
    /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
268
    /// ```
269
0
    pub fn new() -> StreamMap<K, V> {
270
0
        StreamMap { entries: vec![] }
271
0
    }
272
273
    /// Creates an empty `StreamMap` with the specified capacity.
274
    ///
275
    /// The stream map will be able to hold at least `capacity` elements without
276
    /// reallocating. If `capacity` is 0, the stream map will not allocate.
277
    ///
278
    /// # Examples
279
    ///
280
    /// ```
281
    /// use tokio_stream::{StreamMap, Pending};
282
    ///
283
    /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
284
    /// ```
285
0
    pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
286
0
        StreamMap {
287
0
            entries: Vec::with_capacity(capacity),
288
0
        }
289
0
    }
290
291
    /// Returns an iterator visiting all keys in arbitrary order.
292
    ///
293
    /// The iterator element type is `&'a K`.
294
    ///
295
    /// # Examples
296
    ///
297
    /// ```
298
    /// use tokio_stream::{StreamMap, pending};
299
    ///
300
    /// let mut map = StreamMap::new();
301
    ///
302
    /// map.insert("a", pending::<i32>());
303
    /// map.insert("b", pending());
304
    /// map.insert("c", pending());
305
    ///
306
    /// for key in map.keys() {
307
    ///     println!("{}", key);
308
    /// }
309
    /// ```
310
0
    pub fn keys(&self) -> impl Iterator<Item = &K> {
311
0
        self.iter().map(|(k, _)| k)
312
0
    }
313
314
    /// An iterator visiting all values in arbitrary order.
315
    ///
316
    /// The iterator element type is `&'a V`.
317
    ///
318
    /// # Examples
319
    ///
320
    /// ```
321
    /// use tokio_stream::{StreamMap, pending};
322
    ///
323
    /// let mut map = StreamMap::new();
324
    ///
325
    /// map.insert("a", pending::<i32>());
326
    /// map.insert("b", pending());
327
    /// map.insert("c", pending());
328
    ///
329
    /// for stream in map.values() {
330
    ///     println!("{:?}", stream);
331
    /// }
332
    /// ```
333
0
    pub fn values(&self) -> impl Iterator<Item = &V> {
334
0
        self.iter().map(|(_, v)| v)
335
0
    }
336
337
    /// An iterator visiting all values mutably in arbitrary order.
338
    ///
339
    /// The iterator element type is `&'a mut V`.
340
    ///
341
    /// # Examples
342
    ///
343
    /// ```
344
    /// use tokio_stream::{StreamMap, pending};
345
    ///
346
    /// let mut map = StreamMap::new();
347
    ///
348
    /// map.insert("a", pending::<i32>());
349
    /// map.insert("b", pending());
350
    /// map.insert("c", pending());
351
    ///
352
    /// for stream in map.values_mut() {
353
    ///     println!("{:?}", stream);
354
    /// }
355
    /// ```
356
0
    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
357
0
        self.iter_mut().map(|(_, v)| v)
358
0
    }
359
360
    /// Returns the number of streams the map can hold without reallocating.
361
    ///
362
    /// This number is a lower bound; the `StreamMap` might be able to hold
363
    /// more, but is guaranteed to be able to hold at least this many.
364
    ///
365
    /// # Examples
366
    ///
367
    /// ```
368
    /// use tokio_stream::{StreamMap, Pending};
369
    ///
370
    /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
371
    /// assert!(map.capacity() >= 100);
372
    /// ```
373
0
    pub fn capacity(&self) -> usize {
374
0
        self.entries.capacity()
375
0
    }
376
377
    /// Returns the number of streams in the map.
378
    ///
379
    /// # Examples
380
    ///
381
    /// ```
382
    /// use tokio_stream::{StreamMap, pending};
383
    ///
384
    /// let mut a = StreamMap::new();
385
    /// assert_eq!(a.len(), 0);
386
    /// a.insert(1, pending::<i32>());
387
    /// assert_eq!(a.len(), 1);
388
    /// ```
389
0
    pub fn len(&self) -> usize {
390
0
        self.entries.len()
391
0
    }
392
393
    /// Returns `true` if the map contains no elements.
394
    ///
395
    /// # Examples
396
    ///
397
    /// ```
398
    /// use tokio_stream::{StreamMap, pending};
399
    ///
400
    /// let mut a = StreamMap::new();
401
    /// assert!(a.is_empty());
402
    /// a.insert(1, pending::<i32>());
403
    /// assert!(!a.is_empty());
404
    /// ```
405
0
    pub fn is_empty(&self) -> bool {
406
0
        self.entries.is_empty()
407
0
    }
408
409
    /// Clears the map, removing all key-stream pairs. Keeps the allocated
410
    /// memory for reuse.
411
    ///
412
    /// # Examples
413
    ///
414
    /// ```
415
    /// use tokio_stream::{StreamMap, pending};
416
    ///
417
    /// let mut a = StreamMap::new();
418
    /// a.insert(1, pending::<i32>());
419
    /// a.clear();
420
    /// assert!(a.is_empty());
421
    /// ```
422
0
    pub fn clear(&mut self) {
423
0
        self.entries.clear();
424
0
    }
425
426
    /// Insert a key-stream pair into the map.
427
    ///
428
    /// If the map did not have this key present, `None` is returned.
429
    ///
430
    /// If the map did have this key present, the new `stream` replaces the old
431
    /// one and the old stream is returned.
432
    ///
433
    /// # Examples
434
    ///
435
    /// ```
436
    /// use tokio_stream::{StreamMap, pending};
437
    ///
438
    /// let mut map = StreamMap::new();
439
    ///
440
    /// assert!(map.insert(37, pending::<i32>()).is_none());
441
    /// assert!(!map.is_empty());
442
    ///
443
    /// map.insert(37, pending());
444
    /// assert!(map.insert(37, pending()).is_some());
445
    /// ```
446
0
    pub fn insert(&mut self, k: K, stream: V) -> Option<V>
447
0
    where
448
0
        K: Hash + Eq,
449
    {
450
0
        let ret = self.remove(&k);
451
0
        self.entries.push((k, stream));
452
453
0
        ret
454
0
    }
455
456
    /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
457
    ///
458
    /// The key may be any borrowed form of the map's key type, but `Hash` and
459
    /// `Eq` on the borrowed form must match those for the key type.
460
    ///
461
    /// # Examples
462
    ///
463
    /// ```
464
    /// use tokio_stream::{StreamMap, pending};
465
    ///
466
    /// let mut map = StreamMap::new();
467
    /// map.insert(1, pending::<i32>());
468
    /// assert!(map.remove(&1).is_some());
469
    /// assert!(map.remove(&1).is_none());
470
    /// ```
471
0
    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
472
0
    where
473
0
        K: Borrow<Q>,
474
0
        Q: Hash + Eq + ?Sized,
475
    {
476
0
        for i in 0..self.entries.len() {
477
0
            if self.entries[i].0.borrow() == k {
478
0
                return Some(self.entries.swap_remove(i).1);
479
0
            }
480
        }
481
482
0
        None
483
0
    }
484
485
    /// Returns `true` if the map contains a stream for the specified key.
486
    ///
487
    /// The key may be any borrowed form of the map's key type, but `Hash` and
488
    /// `Eq` on the borrowed form must match those for the key type.
489
    ///
490
    /// # Examples
491
    ///
492
    /// ```
493
    /// use tokio_stream::{StreamMap, pending};
494
    ///
495
    /// let mut map = StreamMap::new();
496
    /// map.insert(1, pending::<i32>());
497
    /// assert_eq!(map.contains_key(&1), true);
498
    /// assert_eq!(map.contains_key(&2), false);
499
    /// ```
500
0
    pub fn contains_key<Q>(&self, k: &Q) -> bool
501
0
    where
502
0
        K: Borrow<Q>,
503
0
        Q: Hash + Eq + ?Sized,
504
    {
505
0
        for i in 0..self.entries.len() {
506
0
            if self.entries[i].0.borrow() == k {
507
0
                return true;
508
0
            }
509
        }
510
511
0
        false
512
0
    }
513
}
514
515
impl<K, V> StreamMap<K, V>
516
where
517
    K: Unpin,
518
    V: Stream + Unpin,
519
{
520
    /// Polls the next value, includes the vec entry index
521
0
    fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
522
0
        let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
523
0
        let mut idx = start;
524
525
0
        for _ in 0..self.entries.len() {
526
0
            let (_, stream) = &mut self.entries[idx];
527
528
0
            match Pin::new(stream).poll_next(cx) {
529
0
                Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
530
                Poll::Ready(None) => {
531
                    // Remove the entry
532
0
                    self.entries.swap_remove(idx);
533
534
                    // Check if this was the last entry, if so the cursor needs
535
                    // to wrap
536
0
                    if idx == self.entries.len() {
537
0
                        idx = 0;
538
0
                    } else if idx < start && start <= self.entries.len() {
539
0
                        // The stream being swapped into the current index has
540
0
                        // already been polled, so skip it.
541
0
                        idx = idx.wrapping_add(1) % self.entries.len();
542
0
                    }
543
                }
544
0
                Poll::Pending => {
545
0
                    idx = idx.wrapping_add(1) % self.entries.len();
546
0
                }
547
            }
548
        }
549
550
        // If the map is empty, then the stream is complete.
551
0
        if self.entries.is_empty() {
552
0
            Poll::Ready(None)
553
        } else {
554
0
            Poll::Pending
555
        }
556
0
    }
557
}
558
559
impl<K, V> Default for StreamMap<K, V> {
560
0
    fn default() -> Self {
561
0
        Self::new()
562
0
    }
563
}
564
565
impl<K, V> StreamMap<K, V>
566
where
567
    K: Clone + Unpin,
568
    V: Stream + Unpin,
569
{
570
    /// Receives multiple items on this [`StreamMap`], extending the provided `buffer`.
571
    ///
572
    /// This method returns the number of items that is appended to the `buffer`.
573
    ///
574
    /// Note that this method does not guarantee that exactly `limit` items
575
    /// are received. Rather, if at least one item is available, it returns
576
    /// as many items as it can up to the given limit. This method returns
577
    /// zero only if the `StreamMap` is empty (or if `limit` is zero).
578
    ///
579
    /// # Cancel safety
580
    ///
581
    /// This method is cancel safe. If `next_many` is used as the event in a
582
    /// [`tokio::select!`](tokio::select) statement and some other branch
583
    /// completes first, it is guaranteed that no items were received on any of
584
    /// the underlying streams.
585
0
    pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize {
586
0
        poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await
587
0
    }
588
589
    /// Polls to receive multiple items on this `StreamMap`, extending the provided `buffer`.
590
    ///
591
    /// This method returns:
592
    /// * `Poll::Pending` if no items are available but the `StreamMap` is not empty.
593
    /// * `Poll::Ready(count)` where `count` is the number of items successfully received and
594
    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
595
    /// * `Poll::Ready(0)` if `limit` is set to zero or when the `StreamMap` is empty.
596
    ///
597
    /// Note that this method does not guarantee that exactly `limit` items
598
    /// are received. Rather, if at least one item is available, it returns
599
    /// as many items as it can up to the given limit. This method returns
600
    /// zero only if the `StreamMap` is empty (or if `limit` is zero).
601
0
    pub fn poll_next_many(
602
0
        &mut self,
603
0
        cx: &mut Context<'_>,
604
0
        buffer: &mut Vec<(K, V::Item)>,
605
0
        limit: usize,
606
0
    ) -> Poll<usize> {
607
0
        if limit == 0 || self.entries.is_empty() {
608
0
            return Poll::Ready(0);
609
0
        }
610
611
0
        let mut added = 0;
612
613
0
        let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
614
0
        let mut idx = start;
615
616
0
        while added < limit {
617
            // Indicates whether at least one stream returned a value when polled or not
618
0
            let mut should_loop = false;
619
620
0
            for _ in 0..self.entries.len() {
621
0
                let (_, stream) = &mut self.entries[idx];
622
623
0
                match Pin::new(stream).poll_next(cx) {
624
0
                    Poll::Ready(Some(val)) => {
625
0
                        added += 1;
626
0
627
0
                        let key = self.entries[idx].0.clone();
628
0
                        buffer.push((key, val));
629
0
630
0
                        should_loop = true;
631
0
632
0
                        idx = idx.wrapping_add(1) % self.entries.len();
633
0
                    }
634
                    Poll::Ready(None) => {
635
                        // Remove the entry
636
0
                        self.entries.swap_remove(idx);
637
638
                        // Check if this was the last entry, if so the cursor needs
639
                        // to wrap
640
0
                        if idx == self.entries.len() {
641
0
                            idx = 0;
642
0
                        } else if idx < start && start <= self.entries.len() {
643
0
                            // The stream being swapped into the current index has
644
0
                            // already been polled, so skip it.
645
0
                            idx = idx.wrapping_add(1) % self.entries.len();
646
0
                        }
647
                    }
648
0
                    Poll::Pending => {
649
0
                        idx = idx.wrapping_add(1) % self.entries.len();
650
0
                    }
651
                }
652
            }
653
654
0
            if !should_loop {
655
0
                break;
656
0
            }
657
        }
658
659
0
        if added > 0 {
660
0
            Poll::Ready(added)
661
0
        } else if self.entries.is_empty() {
662
0
            Poll::Ready(0)
663
        } else {
664
0
            Poll::Pending
665
        }
666
0
    }
667
}
668
669
impl<K, V> Stream for StreamMap<K, V>
670
where
671
    K: Clone + Unpin,
672
    V: Stream + Unpin,
673
{
674
    type Item = (K, V::Item);
675
676
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
677
0
        if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
678
0
            let key = self.entries[idx].0.clone();
679
0
            Poll::Ready(Some((key, val)))
680
        } else {
681
0
            Poll::Ready(None)
682
        }
683
0
    }
684
685
0
    fn size_hint(&self) -> (usize, Option<usize>) {
686
0
        let mut ret = (0, Some(0));
687
688
0
        for (_, stream) in &self.entries {
689
0
            let hint = stream.size_hint();
690
691
0
            ret.0 += hint.0;
692
693
0
            match (ret.1, hint.1) {
694
0
                (Some(a), Some(b)) => ret.1 = Some(a + b),
695
0
                (Some(_), None) => ret.1 = None,
696
0
                _ => {}
697
            }
698
        }
699
700
0
        ret
701
0
    }
702
}
703
704
impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
705
where
706
    K: Hash + Eq,
707
{
708
0
    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
709
0
        let iterator = iter.into_iter();
710
0
        let (lower_bound, _) = iterator.size_hint();
711
0
        let mut stream_map = Self::with_capacity(lower_bound);
712
713
0
        for (key, value) in iterator {
714
0
            stream_map.insert(key, value);
715
0
        }
716
717
0
        stream_map
718
0
    }
719
}
720
721
impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
722
0
    fn extend<T>(&mut self, iter: T)
723
0
    where
724
0
        T: IntoIterator<Item = (K, V)>,
725
    {
726
0
        self.entries.extend(iter);
727
0
    }
728
}
729
730
mod rand {
731
    use std::cell::Cell;
732
733
    mod loom {
734
        #[cfg(not(loom))]
735
        pub(crate) mod rand {
736
            use std::collections::hash_map::RandomState;
737
            use std::hash::{BuildHasher, Hash, Hasher};
738
            use std::sync::atomic::AtomicU32;
739
            use std::sync::atomic::Ordering::Relaxed;
740
741
            static COUNTER: AtomicU32 = AtomicU32::new(1);
742
743
0
            pub(crate) fn seed() -> u64 {
744
0
                let rand_state = RandomState::new();
745
746
0
                let mut hasher = rand_state.build_hasher();
747
748
                // Hash some unique-ish data to generate some new state
749
0
                COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
750
751
                // Get the seed
752
0
                hasher.finish()
753
0
            }
754
        }
755
756
        #[cfg(loom)]
757
        pub(crate) mod rand {
758
            pub(crate) fn seed() -> u64 {
759
                1
760
            }
761
        }
762
    }
763
764
    /// Fast random number generate
765
    ///
766
    /// Implement `xorshift64+`: 2 32-bit `xorshift` sequences added together.
767
    /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
768
    /// `Xorshift` paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
769
    /// This generator passes the SmallCrush suite, part of TestU01 framework:
770
    /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
771
    #[derive(Debug)]
772
    pub(crate) struct FastRand {
773
        one: Cell<u32>,
774
        two: Cell<u32>,
775
    }
776
777
    impl FastRand {
778
        /// Initialize a new, thread-local, fast random number generator.
779
0
        pub(crate) fn new(seed: u64) -> FastRand {
780
0
            let one = (seed >> 32) as u32;
781
0
            let mut two = seed as u32;
782
783
0
            if two == 0 {
784
0
                // This value cannot be zero
785
0
                two = 1;
786
0
            }
787
788
0
            FastRand {
789
0
                one: Cell::new(one),
790
0
                two: Cell::new(two),
791
0
            }
792
0
        }
793
794
0
        pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
795
            // This is similar to fastrand() % n, but faster.
796
            // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
797
0
            let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
798
0
            (mul >> 32) as u32
799
0
        }
800
801
0
        fn fastrand(&self) -> u32 {
802
0
            let mut s1 = self.one.get();
803
0
            let s0 = self.two.get();
804
805
0
            s1 ^= s1 << 17;
806
0
            s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
807
808
0
            self.one.set(s0);
809
0
            self.two.set(s1);
810
811
0
            s0.wrapping_add(s1)
812
0
        }
813
    }
814
815
    // Used by `StreamMap`
816
0
    pub(crate) fn thread_rng_n(n: u32) -> u32 {
817
        thread_local! {
818
            static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
819
        }
820
821
0
        THREAD_RNG.with(|rng| rng.fastrand_n(n))
822
0
    }
823
}