Coverage Report

Created: 2026-03-23 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-stream-0.3.6/src/yielder.rs
Line
Count
Source
1
use std::cell::Cell;
2
use std::future::Future;
3
use std::marker::PhantomData;
4
use std::pin::Pin;
5
use std::ptr;
6
use std::task::{Context, Poll};
7
8
#[derive(Debug)]
9
pub struct Sender<T> {
10
    _p: PhantomData<fn(T) -> T>,
11
}
12
13
#[derive(Debug)]
14
pub struct Receiver<T> {
15
    _p: PhantomData<T>,
16
}
17
18
pub(crate) struct Enter<'a, T> {
19
    _rx: &'a mut Receiver<T>,
20
    prev: *mut (),
21
}
22
23
// Note: It is considered unsound for anyone other than our macros to call
24
// this function. This is a private API intended only for calls from our
25
// macros, and users should never call it, but some people tend to
26
// misinterpret it as fine to call unless it is marked unsafe.
27
#[doc(hidden)]
28
5
pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) {
29
5
    let tx = Sender { _p: PhantomData };
30
5
    let rx = Receiver { _p: PhantomData };
31
5
    (tx, rx)
32
5
}
async_stream::yielder::pair::<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>
Line
Count
Source
28
5
pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) {
29
5
    let tx = Sender { _p: PhantomData };
30
5
    let rx = Receiver { _p: PhantomData };
31
5
    (tx, rx)
32
5
}
Unexecuted instantiation: async_stream::yielder::pair::<_>
33
34
// Tracks the pointer to `Option<T>`.
35
//
36
// TODO: Ensure wakers match?
37
thread_local!(static STORE: Cell<*mut ()> = const { Cell::new(ptr::null_mut()) });
38
39
// ===== impl Sender =====
40
41
impl<T> Sender<T> {
42
4
    pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
43
4
        Send { value: Some(value) }
44
4
    }
<async_stream::yielder::Sender<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::send
Line
Count
Source
42
4
    pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
43
4
        Send { value: Some(value) }
44
4
    }
Unexecuted instantiation: <async_stream::yielder::Sender<_>>::send
45
}
46
47
struct Send<T> {
48
    value: Option<T>,
49
}
50
51
impl<T> Unpin for Send<T> {}
52
53
impl<T> Future for Send<T> {
54
    type Output = ();
55
56
8
    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
57
8
        if self.value.is_none() {
58
4
            return Poll::Ready(());
59
4
        }
60
61
4
        STORE.with(|cell| {
62
4
            let ptr = cell.get() as *mut Option<T>;
63
4
            let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");
64
65
4
            if option_ref.is_none() {
66
4
                *option_ref = self.value.take();
67
4
            }
68
69
4
            Poll::Pending
70
4
        })
<async_stream::yielder::Send<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::future::future::Future>::poll::{closure#0}
Line
Count
Source
61
4
        STORE.with(|cell| {
62
4
            let ptr = cell.get() as *mut Option<T>;
63
4
            let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");
64
65
4
            if option_ref.is_none() {
66
4
                *option_ref = self.value.take();
67
4
            }
68
69
4
            Poll::Pending
70
4
        })
Unexecuted instantiation: <async_stream::yielder::Send<_> as core::future::future::Future>::poll::{closure#0}
71
8
    }
<async_stream::yielder::Send<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::future::future::Future>::poll
Line
Count
Source
56
8
    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
57
8
        if self.value.is_none() {
58
4
            return Poll::Ready(());
59
4
        }
60
61
4
        STORE.with(|cell| {
62
            let ptr = cell.get() as *mut Option<T>;
63
            let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");
64
65
            if option_ref.is_none() {
66
                *option_ref = self.value.take();
67
            }
68
69
            Poll::Pending
70
        })
71
8
    }
Unexecuted instantiation: <async_stream::yielder::Send<_> as core::future::future::Future>::poll
72
}
73
74
// ===== impl Receiver =====
75
76
impl<T> Receiver<T> {
77
9
    pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
78
9
        let prev = STORE.with(|cell| {
79
9
            let prev = cell.get();
80
9
            cell.set(dst as *mut _ as *mut ());
81
9
            prev
82
9
        });
<async_stream::yielder::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::enter::{closure#0}
Line
Count
Source
78
9
        let prev = STORE.with(|cell| {
79
9
            let prev = cell.get();
80
9
            cell.set(dst as *mut _ as *mut ());
81
9
            prev
82
9
        });
Unexecuted instantiation: <async_stream::yielder::Receiver<_>>::enter::{closure#0}
83
84
9
        Enter { _rx: self, prev }
85
9
    }
<async_stream::yielder::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::enter
Line
Count
Source
77
9
    pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
78
9
        let prev = STORE.with(|cell| {
79
            let prev = cell.get();
80
            cell.set(dst as *mut _ as *mut ());
81
            prev
82
        });
83
84
9
        Enter { _rx: self, prev }
85
9
    }
Unexecuted instantiation: <async_stream::yielder::Receiver<_>>::enter
86
}
87
88
// ===== impl Enter =====
89
90
impl<'a, T> Drop for Enter<'a, T> {
91
9
    fn drop(&mut self) {
92
9
        STORE.with(|cell| cell.set(self.prev));
<async_stream::yielder::Enter<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}
Line
Count
Source
92
9
        STORE.with(|cell| cell.set(self.prev));
Unexecuted instantiation: <async_stream::yielder::Enter<_> as core::ops::drop::Drop>::drop::{closure#0}
93
9
    }
<async_stream::yielder::Enter<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop
Line
Count
Source
91
9
    fn drop(&mut self) {
92
9
        STORE.with(|cell| cell.set(self.prev));
93
9
    }
Unexecuted instantiation: <async_stream::yielder::Enter<_> as core::ops::drop::Drop>::drop
94
}