Coverage Report

Created: 2026-03-31 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-stream-0.3.6/src/async_stream.rs
Line
Count
Source
1
use crate::yielder::Receiver;
2
3
use futures_core::{FusedStream, Stream};
4
use pin_project_lite::pin_project;
5
use std::future::Future;
6
use std::pin::Pin;
7
use std::task::{Context, Poll};
8
9
pin_project! {
10
    #[doc(hidden)]
11
    #[derive(Debug)]
12
    pub struct AsyncStream<T, U> {
13
        rx: Receiver<T>,
14
        done: bool,
15
        #[pin]
16
        generator: U,
17
    }
18
}
19
20
impl<T, U> AsyncStream<T, U> {
21
    #[doc(hidden)]
22
5
    pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23
5
        AsyncStream {
24
5
            rx,
25
5
            done: false,
26
5
            generator,
27
5
        }
28
5
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Line
Count
Source
22
4
    pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23
4
        AsyncStream {
24
4
            rx,
25
4
            done: false,
26
4
            generator,
27
4
        }
28
4
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Line
Count
Source
22
1
    pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23
1
        AsyncStream {
24
1
            rx,
25
1
            done: false,
26
1
            generator,
27
1
        }
28
1
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}>>::new
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _>>::new
29
}
30
31
impl<T, U> FusedStream for AsyncStream<T, U>
32
where
33
    U: Future<Output = ()>,
34
{
35
0
    fn is_terminated(&self) -> bool {
36
0
        self.done
37
0
    }
38
}
39
40
impl<T, U> Stream for AsyncStream<T, U>
41
where
42
    U: Future<Output = ()>,
43
{
44
    type Item = T;
45
46
9
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47
9
        let me = self.project();
48
49
9
        if *me.done {
50
0
            return Poll::Ready(None);
51
9
        }
52
53
9
        let mut dst = None;
54
9
        let res = {
55
9
            let _enter = me.rx.enter(&mut dst);
56
9
            me.generator.poll(cx)
57
        };
58
59
9
        *me.done = res.is_ready();
60
61
9
        if dst.is_some() {
62
4
            return Poll::Ready(dst.take());
63
5
        }
64
65
5
        if *me.done {
66
5
            Poll::Ready(None)
67
        } else {
68
0
            Poll::Pending
69
        }
70
9
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Line
Count
Source
46
8
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47
8
        let me = self.project();
48
49
8
        if *me.done {
50
0
            return Poll::Ready(None);
51
8
        }
52
53
8
        let mut dst = None;
54
8
        let res = {
55
8
            let _enter = me.rx.enter(&mut dst);
56
8
            me.generator.poll(cx)
57
        };
58
59
8
        *me.done = res.is_ready();
60
61
8
        if dst.is_some() {
62
4
            return Poll::Ready(dst.take());
63
4
        }
64
65
4
        if *me.done {
66
4
            Poll::Ready(None)
67
        } else {
68
0
            Poll::Pending
69
        }
70
8
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
<async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Line
Count
Source
46
1
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47
1
        let me = self.project();
48
49
1
        if *me.done {
50
0
            return Poll::Ready(None);
51
1
        }
52
53
1
        let mut dst = None;
54
1
        let res = {
55
1
            let _enter = me.rx.enter(&mut dst);
56
1
            me.generator.poll(cx)
57
        };
58
59
1
        *me.done = res.is_ready();
60
61
1
        if dst.is_some() {
62
0
            return Poll::Ready(dst.take());
63
1
        }
64
65
1
        if *me.done {
66
1
            Poll::Ready(None)
67
        } else {
68
0
            Poll::Pending
69
        }
70
1
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _> as futures_core::stream::Stream>::poll_next
71
72
0
    fn size_hint(&self) -> (usize, Option<usize>) {
73
0
        if self.done {
74
0
            (0, Some(0))
75
        } else {
76
0
            (0, None)
77
        }
78
0
    }
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, surrealdb_core::exec::operators::scan::pipeline::kv_scan_stream::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::union_index::UnionIndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::knn::KnnScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index::IndexScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::fulltext::FullTextScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::unwrap_exactly_one::UnwrapExactlyOne as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::limit::Limit as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::explain::AnalyzePlan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::timeout::Timeout as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::index_count::IndexCountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::count::CountScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::table::TableScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::dynamic::DynamicScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::record_id::RecordIdScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::source_expr::SourceExpr as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::aggregate::Aggregate as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::reference::ReferenceScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>, <surrealdb_core::exec::operators::scan::graph::GraphEdgeScan as surrealdb_core::exec::ExecOperator>::execute::{closure#0}> as futures_core::stream::Stream>::size_hint
Unexecuted instantiation: <async_stream::async_stream::AsyncStream<_, _> as futures_core::stream::Stream>::size_hint
79
}