/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-stream-0.3.6/src/async_stream.rs
Line | Count | Source |
1 | | use crate::yielder::Receiver; |
2 | | |
3 | | use futures_core::{FusedStream, Stream}; |
4 | | use pin_project_lite::pin_project; |
5 | | use std::future::Future; |
6 | | use std::pin::Pin; |
7 | | use std::task::{Context, Poll}; |
8 | | |
9 | | pin_project! { |
10 | | #[doc(hidden)] |
11 | | #[derive(Debug)] |
12 | | pub struct AsyncStream<T, U> { |
13 | | rx: Receiver<T>, |
14 | | done: bool, |
15 | | #[pin] |
16 | | generator: U, |
17 | | } |
18 | | } |
19 | | |
20 | | impl<T, U> AsyncStream<T, U> { |
21 | | #[doc(hidden)] |
22 | 5 | pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { |
23 | 5 | AsyncStream { |
24 | 5 | rx, |
25 | 5 | done: false, |
26 | 5 | generator, |
27 | 5 | } |
28 | 5 | } Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newLine | Count | Source | 22 | 4 | pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { | 23 | 4 | AsyncStream { | 24 | 4 | rx, | 25 | 4 | done: false, | 26 | 4 | generator, | 27 | 4 | } | 28 | 4 | } |
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newLine | Count | Source | 22 | 1 | pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { | 23 | 1 | AsyncStream { | 24 | 1 | rx, | 25 | 1 | done: false, | 26 | 1 | generator, | 27 | 1 | } | 28 | 1 | } |
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::newUnexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _>>::new |
29 | | } |
30 | | |
31 | | impl<T, U> FusedStream for AsyncStream<T, U> |
32 | | where |
33 | | U: Future<Output = ()>, |
34 | | { |
35 | 0 | fn is_terminated(&self) -> bool { |
36 | 0 | self.done |
37 | 0 | } |
38 | | } |
39 | | |
40 | | impl<T, U> Stream for AsyncStream<T, U> |
41 | | where |
42 | | U: Future<Output = ()>, |
43 | | { |
44 | | type Item = T; |
45 | | |
46 | 9 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
47 | 9 | let me = self.project(); |
48 | | |
49 | 9 | if *me.done { |
50 | 0 | return Poll::Ready(None); |
51 | 9 | } |
52 | | |
53 | 9 | let mut dst = None; |
54 | 9 | let res = { |
55 | 9 | let _enter = me.rx.enter(&mut dst); |
56 | 9 | me.generator.poll(cx) |
57 | | }; |
58 | | |
59 | 9 | *me.done = res.is_ready(); |
60 | | |
61 | 9 | if dst.is_some() { |
62 | 4 | return Poll::Ready(dst.take()); |
63 | 5 | } |
64 | | |
65 | 5 | if *me.done { |
66 | 5 | Poll::Ready(None) |
67 | | } else { |
68 | 0 | Poll::Pending |
69 | | } |
70 | 9 | } Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextLine | Count | Source | 46 | 8 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | 47 | 8 | let me = self.project(); | 48 | | | 49 | 8 | if *me.done { | 50 | 0 | return Poll::Ready(None); | 51 | 8 | } | 52 | | | 53 | 8 | let mut dst = None; | 54 | 8 | let res = { | 55 | 8 | let _enter = me.rx.enter(&mut dst); | 56 | 8 | me.generator.poll(cx) | 57 | | }; | 58 | | | 59 | 8 | *me.done = res.is_ready(); | 60 | | | 61 | 8 | if dst.is_some() { | 62 | 4 | return Poll::Ready(dst.take()); | 63 | 4 | } | 64 | | | 65 | 4 | if *me.done { | 66 | 4 | Poll::Ready(None) | 67 | | } else { | 68 | 0 | Poll::Pending | 69 | | } | 70 | 8 | } |
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextLine | Count | Source | 46 | 1 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | 47 | 1 | let me = self.project(); | 48 | | | 49 | 1 | if *me.done { | 50 | 0 | return Poll::Ready(None); | 51 | 1 | } | 52 | | | 53 | 1 | let mut dst = None; | 54 | 1 | let res = { | 55 | 1 | let _enter = me.rx.enter(&mut dst); | 56 | 1 | me.generator.poll(cx) | 57 | | }; | 58 | | | 59 | 1 | *me.done = res.is_ready(); | 60 | | | 61 | 1 | if dst.is_some() { | 62 | 0 | return Poll::Ready(dst.take()); | 63 | 1 | } | 64 | | | 65 | 1 | if *me.done { | 66 | 1 | Poll::Ready(None) | 67 | | } else { | 68 | 0 | Poll::Pending | 69 | | } | 70 | 1 | } |
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _> as futures_core::stream::Stream>::poll_next |
71 | | |
72 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
73 | 0 | if self.done { |
74 | 0 | (0, Some(0)) |
75 | | } else { |
76 | 0 | (0, None) |
77 | | } |
78 | 0 | } Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hintUnexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _> as futures_core::stream::Stream>::size_hint |
79 | | } |