/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.17/src/stream_map.rs
Line | Count | Source |
1 | | use crate::Stream; |
2 | | |
3 | | use std::borrow::Borrow; |
4 | | use std::future::poll_fn; |
5 | | use std::hash::Hash; |
6 | | use std::pin::Pin; |
7 | | use std::task::{ready, Context, Poll}; |
8 | | |
9 | | /// Combine many streams into one, indexing each source stream with a unique |
10 | | /// key. |
11 | | /// |
12 | | /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source |
13 | | /// streams into a single merged stream that yields values in the order that |
14 | | /// they arrive from the source streams. However, `StreamMap` has a lot more |
15 | | /// flexibility in usage patterns. |
16 | | /// |
17 | | /// `StreamMap` can: |
18 | | /// |
19 | | /// * Merge an arbitrary number of streams. |
20 | | /// * Track which source stream the value was received from. |
21 | | /// * Handle inserting and removing streams from the set of managed streams at |
22 | | /// any point during iteration. |
23 | | /// |
24 | | /// All source streams held by `StreamMap` are indexed using a key. This key is |
25 | | /// included with the value when a source stream yields a value. The key is also |
26 | | /// used to remove the stream from the `StreamMap` before the stream has |
27 | | /// completed streaming. |
28 | | /// |
29 | | /// # `Unpin` |
30 | | /// |
31 | | /// Because the `StreamMap` API moves streams during runtime, both streams and |
32 | | /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a |
33 | | /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to |
34 | | /// pin the stream in the heap. |
35 | | /// |
36 | | /// # Implementation |
37 | | /// |
38 | | /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this |
39 | | /// internal implementation detail will persist in future versions, but it is |
40 | | /// important to know the runtime implications. In general, `StreamMap` works |
41 | | /// best with a "smallish" number of streams as all entries are scanned on |
42 | | /// insert, remove, and polling. In cases where a large number of streams need |
43 | | /// to be merged, it may be advisable to use tasks sending values on a shared |
44 | | /// [`mpsc`] channel. |
45 | | /// |
46 | | /// # Notes |
47 | | /// |
48 | | /// `StreamMap` removes finished streams automatically, without alerting the user. |
49 | | /// In some scenarios, the caller would want to know on closed streams. |
50 | | /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream. |
51 | | /// It will return None when the stream is closed. |
52 | | /// |
53 | | /// [`StreamExt::merge`]: crate::StreamExt::merge |
54 | | /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html |
55 | | /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html |
56 | | /// [`Box::pin`]: std::boxed::Box::pin |
57 | | /// [`StreamNotifyClose`]: crate::StreamNotifyClose |
58 | | /// |
59 | | /// # Examples |
60 | | /// |
61 | | /// Merging two streams, then remove them after receiving the first value |
62 | | /// |
63 | | /// ``` |
64 | | /// use tokio_stream::{StreamExt, StreamMap, Stream}; |
65 | | /// use tokio::sync::mpsc; |
66 | | /// use std::pin::Pin; |
67 | | /// |
68 | | /// #[tokio::main] |
69 | | /// async fn main() { |
70 | | /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); |
71 | | /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); |
72 | | /// |
73 | | /// // Convert the channels to a `Stream`. |
74 | | /// let rx1 = Box::pin(async_stream::stream! { |
75 | | /// while let Some(item) = rx1.recv().await { |
76 | | /// yield item; |
77 | | /// } |
78 | | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
79 | | /// |
80 | | /// let rx2 = Box::pin(async_stream::stream! { |
81 | | /// while let Some(item) = rx2.recv().await { |
82 | | /// yield item; |
83 | | /// } |
84 | | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
85 | | /// |
86 | | /// tokio::spawn(async move { |
87 | | /// tx1.send(1).await.unwrap(); |
88 | | /// |
89 | | /// // This value will never be received. The send may or may not return |
90 | | /// // `Err` depending on if the remote end closed first or not. |
91 | | /// let _ = tx1.send(2).await; |
92 | | /// }); |
93 | | /// |
94 | | /// tokio::spawn(async move { |
95 | | /// tx2.send(3).await.unwrap(); |
96 | | /// let _ = tx2.send(4).await; |
97 | | /// }); |
98 | | /// |
99 | | /// let mut map = StreamMap::new(); |
100 | | /// |
101 | | /// // Insert both streams |
102 | | /// map.insert("one", rx1); |
103 | | /// map.insert("two", rx2); |
104 | | /// |
105 | | /// // Read twice |
106 | | /// for _ in 0..2 { |
107 | | /// let (key, val) = map.next().await.unwrap(); |
108 | | /// |
109 | | /// if key == "one" { |
110 | | /// assert_eq!(val, 1); |
111 | | /// } else { |
112 | | /// assert_eq!(val, 3); |
113 | | /// } |
114 | | /// |
115 | | /// // Remove the stream to prevent reading the next value |
116 | | /// map.remove(key); |
117 | | /// } |
118 | | /// } |
119 | | /// ``` |
120 | | /// |
121 | | /// This example models a read-only client to a chat system with channels. The |
122 | | /// client sends commands to join and leave channels. `StreamMap` is used to |
123 | | /// manage active channel subscriptions. |
124 | | /// |
125 | | /// For simplicity, messages are displayed with `println!`, but they could be |
126 | | /// sent to the client over a socket. |
127 | | /// |
128 | | /// ```no_run |
129 | | /// use tokio_stream::{Stream, StreamExt, StreamMap}; |
130 | | /// |
131 | | /// enum Command { |
132 | | /// Join(String), |
133 | | /// Leave(String), |
134 | | /// } |
135 | | /// |
136 | | /// fn commands() -> impl Stream<Item = Command> { |
137 | | /// // Streams in user commands by parsing `stdin`. |
138 | | /// # tokio_stream::pending() |
139 | | /// } |
140 | | /// |
141 | | /// // Join a channel, returns a stream of messages received on the channel. |
142 | | /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { |
143 | | /// // left as an exercise to the reader |
144 | | /// # tokio_stream::pending() |
145 | | /// } |
146 | | /// |
147 | | /// #[tokio::main] |
148 | | /// async fn main() { |
149 | | /// let mut channels = StreamMap::new(); |
150 | | /// |
151 | | /// // Input commands (join / leave channels). |
152 | | /// let cmds = commands(); |
153 | | /// tokio::pin!(cmds); |
154 | | /// |
155 | | /// loop { |
156 | | /// tokio::select! { |
157 | | /// Some(cmd) = cmds.next() => { |
158 | | /// match cmd { |
159 | | /// Command::Join(chan) => { |
160 | | /// // Join the channel and add it to the `channels` |
161 | | /// // stream map |
162 | | /// let msgs = join(&chan); |
163 | | /// channels.insert(chan, msgs); |
164 | | /// } |
165 | | /// Command::Leave(chan) => { |
166 | | /// channels.remove(&chan); |
167 | | /// } |
168 | | /// } |
169 | | /// } |
170 | | /// Some((chan, msg)) = channels.next() => { |
171 | | /// // Received a message, display it on stdout with the channel |
172 | | /// // it originated from. |
173 | | /// println!("{}: {}", chan, msg); |
174 | | /// } |
175 | | /// // Both the `commands` stream and the `channels` stream are |
176 | | /// // complete. There is no more work to do, so leave the loop. |
177 | | /// else => break, |
178 | | /// } |
179 | | /// } |
180 | | /// } |
181 | | /// ``` |
182 | | /// |
183 | | /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. |
184 | | /// |
185 | | /// ``` |
186 | | /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; |
187 | | /// |
188 | | /// #[tokio::main] |
189 | | /// async fn main() { |
190 | | /// let mut map = StreamMap::new(); |
191 | | /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
192 | | /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
193 | | /// map.insert(0, stream); |
194 | | /// map.insert(1, stream2); |
195 | | /// while let Some((key, val)) = map.next().await { |
196 | | /// match val { |
197 | | /// Some(val) => println!("got {val:?} from stream {key:?}"), |
198 | | /// None => println!("stream {key:?} closed"), |
199 | | /// } |
200 | | /// } |
201 | | /// } |
202 | | /// ``` |
203 | | |
204 | | #[derive(Debug)] |
205 | | pub struct StreamMap<K, V> { |
206 | | /// Streams stored in the map |
207 | | entries: Vec<(K, V)>, |
208 | | } |
209 | | |
210 | | impl<K, V> StreamMap<K, V> { |
211 | | /// An iterator visiting all key-value pairs in arbitrary order. |
212 | | /// |
213 | | /// The iterator element type is `&'a (K, V)`. |
214 | | /// |
215 | | /// # Examples |
216 | | /// |
217 | | /// ``` |
218 | | /// use tokio_stream::{StreamMap, pending}; |
219 | | /// |
220 | | /// let mut map = StreamMap::new(); |
221 | | /// |
222 | | /// map.insert("a", pending::<i32>()); |
223 | | /// map.insert("b", pending()); |
224 | | /// map.insert("c", pending()); |
225 | | /// |
226 | | /// for (key, stream) in map.iter() { |
227 | | /// println!("({}, {:?})", key, stream); |
228 | | /// } |
229 | | /// ``` |
230 | 0 | pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { |
231 | 0 | self.entries.iter() |
232 | 0 | } |
233 | | |
234 | | /// An iterator visiting all key-value pairs mutably in arbitrary order. |
235 | | /// |
236 | | /// The iterator element type is `&'a mut (K, V)`. |
237 | | /// |
238 | | /// # Examples |
239 | | /// |
240 | | /// ``` |
241 | | /// use tokio_stream::{StreamMap, pending}; |
242 | | /// |
243 | | /// let mut map = StreamMap::new(); |
244 | | /// |
245 | | /// map.insert("a", pending::<i32>()); |
246 | | /// map.insert("b", pending()); |
247 | | /// map.insert("c", pending()); |
248 | | /// |
249 | | /// for (key, stream) in map.iter_mut() { |
250 | | /// println!("({}, {:?})", key, stream); |
251 | | /// } |
252 | | /// ``` |
253 | 0 | pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { |
254 | 0 | self.entries.iter_mut() |
255 | 0 | } |
256 | | |
257 | | /// Creates an empty `StreamMap`. |
258 | | /// |
259 | | /// The stream map is initially created with a capacity of `0`, so it will |
260 | | /// not allocate until it is first inserted into. |
261 | | /// |
262 | | /// # Examples |
263 | | /// |
264 | | /// ``` |
265 | | /// use tokio_stream::{StreamMap, Pending}; |
266 | | /// |
267 | | /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); |
268 | | /// ``` |
269 | 0 | pub fn new() -> StreamMap<K, V> { |
270 | 0 | StreamMap { entries: vec![] } |
271 | 0 | } |
272 | | |
273 | | /// Creates an empty `StreamMap` with the specified capacity. |
274 | | /// |
275 | | /// The stream map will be able to hold at least `capacity` elements without |
276 | | /// reallocating. If `capacity` is 0, the stream map will not allocate. |
277 | | /// |
278 | | /// # Examples |
279 | | /// |
280 | | /// ``` |
281 | | /// use tokio_stream::{StreamMap, Pending}; |
282 | | /// |
283 | | /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); |
284 | | /// ``` |
285 | 0 | pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { |
286 | 0 | StreamMap { |
287 | 0 | entries: Vec::with_capacity(capacity), |
288 | 0 | } |
289 | 0 | } |
290 | | |
291 | | /// Returns an iterator visiting all keys in arbitrary order. |
292 | | /// |
293 | | /// The iterator element type is `&'a K`. |
294 | | /// |
295 | | /// # Examples |
296 | | /// |
297 | | /// ``` |
298 | | /// use tokio_stream::{StreamMap, pending}; |
299 | | /// |
300 | | /// let mut map = StreamMap::new(); |
301 | | /// |
302 | | /// map.insert("a", pending::<i32>()); |
303 | | /// map.insert("b", pending()); |
304 | | /// map.insert("c", pending()); |
305 | | /// |
306 | | /// for key in map.keys() { |
307 | | /// println!("{}", key); |
308 | | /// } |
309 | | /// ``` |
310 | 0 | pub fn keys(&self) -> impl Iterator<Item = &K> { |
311 | 0 | self.iter().map(|(k, _)| k) |
312 | 0 | } |
313 | | |
314 | | /// An iterator visiting all values in arbitrary order. |
315 | | /// |
316 | | /// The iterator element type is `&'a V`. |
317 | | /// |
318 | | /// # Examples |
319 | | /// |
320 | | /// ``` |
321 | | /// use tokio_stream::{StreamMap, pending}; |
322 | | /// |
323 | | /// let mut map = StreamMap::new(); |
324 | | /// |
325 | | /// map.insert("a", pending::<i32>()); |
326 | | /// map.insert("b", pending()); |
327 | | /// map.insert("c", pending()); |
328 | | /// |
329 | | /// for stream in map.values() { |
330 | | /// println!("{:?}", stream); |
331 | | /// } |
332 | | /// ``` |
333 | 0 | pub fn values(&self) -> impl Iterator<Item = &V> { |
334 | 0 | self.iter().map(|(_, v)| v) |
335 | 0 | } |
336 | | |
337 | | /// An iterator visiting all values mutably in arbitrary order. |
338 | | /// |
339 | | /// The iterator element type is `&'a mut V`. |
340 | | /// |
341 | | /// # Examples |
342 | | /// |
343 | | /// ``` |
344 | | /// use tokio_stream::{StreamMap, pending}; |
345 | | /// |
346 | | /// let mut map = StreamMap::new(); |
347 | | /// |
348 | | /// map.insert("a", pending::<i32>()); |
349 | | /// map.insert("b", pending()); |
350 | | /// map.insert("c", pending()); |
351 | | /// |
352 | | /// for stream in map.values_mut() { |
353 | | /// println!("{:?}", stream); |
354 | | /// } |
355 | | /// ``` |
356 | 0 | pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { |
357 | 0 | self.iter_mut().map(|(_, v)| v) |
358 | 0 | } |
359 | | |
360 | | /// Returns the number of streams the map can hold without reallocating. |
361 | | /// |
362 | | /// This number is a lower bound; the `StreamMap` might be able to hold |
363 | | /// more, but is guaranteed to be able to hold at least this many. |
364 | | /// |
365 | | /// # Examples |
366 | | /// |
367 | | /// ``` |
368 | | /// use tokio_stream::{StreamMap, Pending}; |
369 | | /// |
370 | | /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); |
371 | | /// assert!(map.capacity() >= 100); |
372 | | /// ``` |
373 | 0 | pub fn capacity(&self) -> usize { |
374 | 0 | self.entries.capacity() |
375 | 0 | } |
376 | | |
377 | | /// Returns the number of streams in the map. |
378 | | /// |
379 | | /// # Examples |
380 | | /// |
381 | | /// ``` |
382 | | /// use tokio_stream::{StreamMap, pending}; |
383 | | /// |
384 | | /// let mut a = StreamMap::new(); |
385 | | /// assert_eq!(a.len(), 0); |
386 | | /// a.insert(1, pending::<i32>()); |
387 | | /// assert_eq!(a.len(), 1); |
388 | | /// ``` |
389 | 0 | pub fn len(&self) -> usize { |
390 | 0 | self.entries.len() |
391 | 0 | } |
392 | | |
393 | | /// Returns `true` if the map contains no elements. |
394 | | /// |
395 | | /// # Examples |
396 | | /// |
397 | | /// ``` |
398 | | /// use tokio_stream::{StreamMap, pending}; |
399 | | /// |
400 | | /// let mut a = StreamMap::new(); |
401 | | /// assert!(a.is_empty()); |
402 | | /// a.insert(1, pending::<i32>()); |
403 | | /// assert!(!a.is_empty()); |
404 | | /// ``` |
405 | 0 | pub fn is_empty(&self) -> bool { |
406 | 0 | self.entries.is_empty() |
407 | 0 | } |
408 | | |
409 | | /// Clears the map, removing all key-stream pairs. Keeps the allocated |
410 | | /// memory for reuse. |
411 | | /// |
412 | | /// # Examples |
413 | | /// |
414 | | /// ``` |
415 | | /// use tokio_stream::{StreamMap, pending}; |
416 | | /// |
417 | | /// let mut a = StreamMap::new(); |
418 | | /// a.insert(1, pending::<i32>()); |
419 | | /// a.clear(); |
420 | | /// assert!(a.is_empty()); |
421 | | /// ``` |
422 | 0 | pub fn clear(&mut self) { |
423 | 0 | self.entries.clear(); |
424 | 0 | } |
425 | | |
426 | | /// Insert a key-stream pair into the map. |
427 | | /// |
428 | | /// If the map did not have this key present, `None` is returned. |
429 | | /// |
430 | | /// If the map did have this key present, the new `stream` replaces the old |
431 | | /// one and the old stream is returned. |
432 | | /// |
433 | | /// # Examples |
434 | | /// |
435 | | /// ``` |
436 | | /// use tokio_stream::{StreamMap, pending}; |
437 | | /// |
438 | | /// let mut map = StreamMap::new(); |
439 | | /// |
440 | | /// assert!(map.insert(37, pending::<i32>()).is_none()); |
441 | | /// assert!(!map.is_empty()); |
442 | | /// |
443 | | /// map.insert(37, pending()); |
444 | | /// assert!(map.insert(37, pending()).is_some()); |
445 | | /// ``` |
446 | 0 | pub fn insert(&mut self, k: K, stream: V) -> Option<V> |
447 | 0 | where |
448 | 0 | K: Hash + Eq, |
449 | | { |
450 | 0 | let ret = self.remove(&k); |
451 | 0 | self.entries.push((k, stream)); |
452 | | |
453 | 0 | ret |
454 | 0 | } |
455 | | |
456 | | /// Removes a key from the map, returning the stream at the key if the key was previously in the map. |
457 | | /// |
458 | | /// The key may be any borrowed form of the map's key type, but `Hash` and |
459 | | /// `Eq` on the borrowed form must match those for the key type. |
460 | | /// |
461 | | /// # Examples |
462 | | /// |
463 | | /// ``` |
464 | | /// use tokio_stream::{StreamMap, pending}; |
465 | | /// |
466 | | /// let mut map = StreamMap::new(); |
467 | | /// map.insert(1, pending::<i32>()); |
468 | | /// assert!(map.remove(&1).is_some()); |
469 | | /// assert!(map.remove(&1).is_none()); |
470 | | /// ``` |
471 | 0 | pub fn remove<Q>(&mut self, k: &Q) -> Option<V> |
472 | 0 | where |
473 | 0 | K: Borrow<Q>, |
474 | 0 | Q: Hash + Eq + ?Sized, |
475 | | { |
476 | 0 | for i in 0..self.entries.len() { |
477 | 0 | if self.entries[i].0.borrow() == k { |
478 | 0 | return Some(self.entries.swap_remove(i).1); |
479 | 0 | } |
480 | | } |
481 | | |
482 | 0 | None |
483 | 0 | } |
484 | | |
485 | | /// Returns `true` if the map contains a stream for the specified key. |
486 | | /// |
487 | | /// The key may be any borrowed form of the map's key type, but `Hash` and |
488 | | /// `Eq` on the borrowed form must match those for the key type. |
489 | | /// |
490 | | /// # Examples |
491 | | /// |
492 | | /// ``` |
493 | | /// use tokio_stream::{StreamMap, pending}; |
494 | | /// |
495 | | /// let mut map = StreamMap::new(); |
496 | | /// map.insert(1, pending::<i32>()); |
497 | | /// assert_eq!(map.contains_key(&1), true); |
498 | | /// assert_eq!(map.contains_key(&2), false); |
499 | | /// ``` |
500 | 0 | pub fn contains_key<Q>(&self, k: &Q) -> bool |
501 | 0 | where |
502 | 0 | K: Borrow<Q>, |
503 | 0 | Q: Hash + Eq + ?Sized, |
504 | | { |
505 | 0 | for i in 0..self.entries.len() { |
506 | 0 | if self.entries[i].0.borrow() == k { |
507 | 0 | return true; |
508 | 0 | } |
509 | | } |
510 | | |
511 | 0 | false |
512 | 0 | } |
513 | | } |
514 | | |
515 | | impl<K, V> StreamMap<K, V> |
516 | | where |
517 | | K: Unpin, |
518 | | V: Stream + Unpin, |
519 | | { |
520 | | /// Polls the next value, includes the vec entry index |
521 | 0 | fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { |
522 | 0 | let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; |
523 | 0 | let mut idx = start; |
524 | | |
525 | 0 | for _ in 0..self.entries.len() { |
526 | 0 | let (_, stream) = &mut self.entries[idx]; |
527 | | |
528 | 0 | match Pin::new(stream).poll_next(cx) { |
529 | 0 | Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))), |
530 | | Poll::Ready(None) => { |
531 | | // Remove the entry |
532 | 0 | self.entries.swap_remove(idx); |
533 | | |
534 | | // Check if this was the last entry, if so the cursor needs |
535 | | // to wrap |
536 | 0 | if idx == self.entries.len() { |
537 | 0 | idx = 0; |
538 | 0 | } else if idx < start && start <= self.entries.len() { |
539 | 0 | // The stream being swapped into the current index has |
540 | 0 | // already been polled, so skip it. |
541 | 0 | idx = idx.wrapping_add(1) % self.entries.len(); |
542 | 0 | } |
543 | | } |
544 | 0 | Poll::Pending => { |
545 | 0 | idx = idx.wrapping_add(1) % self.entries.len(); |
546 | 0 | } |
547 | | } |
548 | | } |
549 | | |
550 | | // If the map is empty, then the stream is complete. |
551 | 0 | if self.entries.is_empty() { |
552 | 0 | Poll::Ready(None) |
553 | | } else { |
554 | 0 | Poll::Pending |
555 | | } |
556 | 0 | } |
557 | | } |
558 | | |
559 | | impl<K, V> Default for StreamMap<K, V> { |
560 | 0 | fn default() -> Self { |
561 | 0 | Self::new() |
562 | 0 | } |
563 | | } |
564 | | |
565 | | impl<K, V> StreamMap<K, V> |
566 | | where |
567 | | K: Clone + Unpin, |
568 | | V: Stream + Unpin, |
569 | | { |
570 | | /// Receives multiple items on this [`StreamMap`], extending the provided `buffer`. |
571 | | /// |
572 | | /// This method returns the number of items that is appended to the `buffer`. |
573 | | /// |
574 | | /// Note that this method does not guarantee that exactly `limit` items |
575 | | /// are received. Rather, if at least one item is available, it returns |
576 | | /// as many items as it can up to the given limit. This method returns |
577 | | /// zero only if the `StreamMap` is empty (or if `limit` is zero). |
578 | | /// |
579 | | /// # Cancel safety |
580 | | /// |
581 | | /// This method is cancel safe. If `next_many` is used as the event in a |
582 | | /// [`tokio::select!`](tokio::select) statement and some other branch |
583 | | /// completes first, it is guaranteed that no items were received on any of |
584 | | /// the underlying streams. |
585 | 0 | pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize { |
586 | 0 | poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await |
587 | 0 | } |
588 | | |
589 | | /// Polls to receive multiple items on this `StreamMap`, extending the provided `buffer`. |
590 | | /// |
591 | | /// This method returns: |
592 | | /// * `Poll::Pending` if no items are available but the `StreamMap` is not empty. |
593 | | /// * `Poll::Ready(count)` where `count` is the number of items successfully received and |
594 | | /// stored in `buffer`. This can be less than, or equal to, `limit`. |
595 | | /// * `Poll::Ready(0)` if `limit` is set to zero or when the `StreamMap` is empty. |
596 | | /// |
597 | | /// Note that this method does not guarantee that exactly `limit` items |
598 | | /// are received. Rather, if at least one item is available, it returns |
599 | | /// as many items as it can up to the given limit. This method returns |
600 | | /// zero only if the `StreamMap` is empty (or if `limit` is zero). |
601 | 0 | pub fn poll_next_many( |
602 | 0 | &mut self, |
603 | 0 | cx: &mut Context<'_>, |
604 | 0 | buffer: &mut Vec<(K, V::Item)>, |
605 | 0 | limit: usize, |
606 | 0 | ) -> Poll<usize> { |
607 | 0 | if limit == 0 || self.entries.is_empty() { |
608 | 0 | return Poll::Ready(0); |
609 | 0 | } |
610 | | |
611 | 0 | let mut added = 0; |
612 | | |
613 | 0 | let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; |
614 | 0 | let mut idx = start; |
615 | | |
616 | 0 | while added < limit { |
617 | | // Indicates whether at least one stream returned a value when polled or not |
618 | 0 | let mut should_loop = false; |
619 | | |
620 | 0 | for _ in 0..self.entries.len() { |
621 | 0 | let (_, stream) = &mut self.entries[idx]; |
622 | | |
623 | 0 | match Pin::new(stream).poll_next(cx) { |
624 | 0 | Poll::Ready(Some(val)) => { |
625 | 0 | added += 1; |
626 | 0 |
|
627 | 0 | let key = self.entries[idx].0.clone(); |
628 | 0 | buffer.push((key, val)); |
629 | 0 |
|
630 | 0 | should_loop = true; |
631 | 0 |
|
632 | 0 | idx = idx.wrapping_add(1) % self.entries.len(); |
633 | 0 | } |
634 | | Poll::Ready(None) => { |
635 | | // Remove the entry |
636 | 0 | self.entries.swap_remove(idx); |
637 | | |
638 | | // Check if this was the last entry, if so the cursor needs |
639 | | // to wrap |
640 | 0 | if idx == self.entries.len() { |
641 | 0 | idx = 0; |
642 | 0 | } else if idx < start && start <= self.entries.len() { |
643 | 0 | // The stream being swapped into the current index has |
644 | 0 | // already been polled, so skip it. |
645 | 0 | idx = idx.wrapping_add(1) % self.entries.len(); |
646 | 0 | } |
647 | | } |
648 | 0 | Poll::Pending => { |
649 | 0 | idx = idx.wrapping_add(1) % self.entries.len(); |
650 | 0 | } |
651 | | } |
652 | | } |
653 | | |
654 | 0 | if !should_loop { |
655 | 0 | break; |
656 | 0 | } |
657 | | } |
658 | | |
659 | 0 | if added > 0 { |
660 | 0 | Poll::Ready(added) |
661 | 0 | } else if self.entries.is_empty() { |
662 | 0 | Poll::Ready(0) |
663 | | } else { |
664 | 0 | Poll::Pending |
665 | | } |
666 | 0 | } |
667 | | } |
668 | | |
669 | | impl<K, V> Stream for StreamMap<K, V> |
670 | | where |
671 | | K: Clone + Unpin, |
672 | | V: Stream + Unpin, |
673 | | { |
674 | | type Item = (K, V::Item); |
675 | | |
676 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
677 | 0 | if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { |
678 | 0 | let key = self.entries[idx].0.clone(); |
679 | 0 | Poll::Ready(Some((key, val))) |
680 | | } else { |
681 | 0 | Poll::Ready(None) |
682 | | } |
683 | 0 | } |
684 | | |
685 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
686 | 0 | let mut ret = (0, Some(0)); |
687 | | |
688 | 0 | for (_, stream) in &self.entries { |
689 | 0 | let hint = stream.size_hint(); |
690 | | |
691 | 0 | ret.0 += hint.0; |
692 | | |
693 | 0 | match (ret.1, hint.1) { |
694 | 0 | (Some(a), Some(b)) => ret.1 = Some(a + b), |
695 | 0 | (Some(_), None) => ret.1 = None, |
696 | 0 | _ => {} |
697 | | } |
698 | | } |
699 | | |
700 | 0 | ret |
701 | 0 | } |
702 | | } |
703 | | |
704 | | impl<K, V> FromIterator<(K, V)> for StreamMap<K, V> |
705 | | where |
706 | | K: Hash + Eq, |
707 | | { |
708 | 0 | fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { |
709 | 0 | let iterator = iter.into_iter(); |
710 | 0 | let (lower_bound, _) = iterator.size_hint(); |
711 | 0 | let mut stream_map = Self::with_capacity(lower_bound); |
712 | | |
713 | 0 | for (key, value) in iterator { |
714 | 0 | stream_map.insert(key, value); |
715 | 0 | } |
716 | | |
717 | 0 | stream_map |
718 | 0 | } |
719 | | } |
720 | | |
721 | | impl<K, V> Extend<(K, V)> for StreamMap<K, V> { |
722 | 0 | fn extend<T>(&mut self, iter: T) |
723 | 0 | where |
724 | 0 | T: IntoIterator<Item = (K, V)>, |
725 | | { |
726 | 0 | self.entries.extend(iter); |
727 | 0 | } |
728 | | } |
729 | | |
730 | | mod rand { |
731 | | use std::cell::Cell; |
732 | | |
733 | | mod loom { |
734 | | #[cfg(not(loom))] |
735 | | pub(crate) mod rand { |
736 | | use std::collections::hash_map::RandomState; |
737 | | use std::hash::{BuildHasher, Hash, Hasher}; |
738 | | use std::sync::atomic::AtomicU32; |
739 | | use std::sync::atomic::Ordering::Relaxed; |
740 | | |
741 | | static COUNTER: AtomicU32 = AtomicU32::new(1); |
742 | | |
743 | 0 | pub(crate) fn seed() -> u64 { |
744 | 0 | let rand_state = RandomState::new(); |
745 | | |
746 | 0 | let mut hasher = rand_state.build_hasher(); |
747 | | |
748 | | // Hash some unique-ish data to generate some new state |
749 | 0 | COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); |
750 | | |
751 | | // Get the seed |
752 | 0 | hasher.finish() |
753 | 0 | } |
754 | | } |
755 | | |
756 | | #[cfg(loom)] |
757 | | pub(crate) mod rand { |
758 | | pub(crate) fn seed() -> u64 { |
759 | | 1 |
760 | | } |
761 | | } |
762 | | } |
763 | | |
764 | | /// Fast random number generate |
765 | | /// |
766 | | /// Implement `xorshift64+`: 2 32-bit `xorshift` sequences added together. |
767 | | /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's |
768 | | /// `Xorshift` paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> |
769 | | /// This generator passes the SmallCrush suite, part of TestU01 framework: |
770 | | /// <http://simul.iro.umontreal.ca/testu01/tu01.html> |
771 | | #[derive(Debug)] |
772 | | pub(crate) struct FastRand { |
773 | | one: Cell<u32>, |
774 | | two: Cell<u32>, |
775 | | } |
776 | | |
777 | | impl FastRand { |
778 | | /// Initialize a new, thread-local, fast random number generator. |
779 | 0 | pub(crate) fn new(seed: u64) -> FastRand { |
780 | 0 | let one = (seed >> 32) as u32; |
781 | 0 | let mut two = seed as u32; |
782 | | |
783 | 0 | if two == 0 { |
784 | 0 | // This value cannot be zero |
785 | 0 | two = 1; |
786 | 0 | } |
787 | | |
788 | 0 | FastRand { |
789 | 0 | one: Cell::new(one), |
790 | 0 | two: Cell::new(two), |
791 | 0 | } |
792 | 0 | } |
793 | | |
794 | 0 | pub(crate) fn fastrand_n(&self, n: u32) -> u32 { |
795 | | // This is similar to fastrand() % n, but faster. |
796 | | // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ |
797 | 0 | let mul = (self.fastrand() as u64).wrapping_mul(n as u64); |
798 | 0 | (mul >> 32) as u32 |
799 | 0 | } |
800 | | |
801 | 0 | fn fastrand(&self) -> u32 { |
802 | 0 | let mut s1 = self.one.get(); |
803 | 0 | let s0 = self.two.get(); |
804 | | |
805 | 0 | s1 ^= s1 << 17; |
806 | 0 | s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; |
807 | | |
808 | 0 | self.one.set(s0); |
809 | 0 | self.two.set(s1); |
810 | | |
811 | 0 | s0.wrapping_add(s1) |
812 | 0 | } |
813 | | } |
814 | | |
815 | | // Used by `StreamMap` |
816 | 0 | pub(crate) fn thread_rng_n(n: u32) -> u32 { |
817 | | thread_local! { |
818 | | static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); |
819 | | } |
820 | | |
821 | 0 | THREAD_RNG.with(|rng| rng.fastrand_n(n)) |
822 | 0 | } |
823 | | } |