/rust/registry/src/index.crates.io-1949cf8c6b5b557f/async-stream-0.3.6/src/yielder.rs
Line | Count | Source |
1 | | use std::cell::Cell; |
2 | | use std::future::Future; |
3 | | use std::marker::PhantomData; |
4 | | use std::pin::Pin; |
5 | | use std::ptr; |
6 | | use std::task::{Context, Poll}; |
7 | | |
8 | | #[derive(Debug)] |
9 | | pub struct Sender<T> { |
10 | | _p: PhantomData<fn(T) -> T>, |
11 | | } |
12 | | |
13 | | #[derive(Debug)] |
14 | | pub struct Receiver<T> { |
15 | | _p: PhantomData<T>, |
16 | | } |
17 | | |
18 | | pub(crate) struct Enter<'a, T> { |
19 | | _rx: &'a mut Receiver<T>, |
20 | | prev: *mut (), |
21 | | } |
22 | | |
23 | | // Note: It is considered unsound for anyone other than our macros to call |
24 | | // this function. This is a private API intended only for calls from our |
25 | | // macros, and users should never call it, but some people tend to |
26 | | // misinterpret it as fine to call unless it is marked unsafe. |
27 | | #[doc(hidden)] |
28 | 5 | pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) { |
29 | 5 | let tx = Sender { _p: PhantomData }; |
30 | 5 | let rx = Receiver { _p: PhantomData }; |
31 | 5 | (tx, rx) |
32 | 5 | } async_stream::yielder::pair::<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> Line | Count | Source | 28 | 5 | pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) { | 29 | 5 | let tx = Sender { _p: PhantomData }; | 30 | 5 | let rx = Receiver { _p: PhantomData }; | 31 | 5 | (tx, rx) | 32 | 5 | } |
Unexecuted instantiation: async_stream::yielder::pair::<_> |
33 | | |
34 | | // Tracks the pointer to `Option<T>`. |
35 | | // |
36 | | // TODO: Ensure wakers match? |
37 | | thread_local!(static STORE: Cell<*mut ()> = const { Cell::new(ptr::null_mut()) }); |
38 | | |
39 | | // ===== impl Sender ===== |
40 | | |
41 | | impl<T> Sender<T> { |
42 | 4 | pub fn send(&mut self, value: T) -> impl Future<Output = ()> { |
43 | 4 | Send { value: Some(value) } |
44 | 4 | } <async_stream::yielder::Sender<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::send Line | Count | Source | 42 | 4 | pub fn send(&mut self, value: T) -> impl Future<Output = ()> { | 43 | 4 | Send { value: Some(value) } | 44 | 4 | } |
Unexecuted instantiation: <async_stream::yielder::Sender<_>>::send |
45 | | } |
46 | | |
47 | | struct Send<T> { |
48 | | value: Option<T>, |
49 | | } |
50 | | |
51 | | impl<T> Unpin for Send<T> {} |
52 | | |
53 | | impl<T> Future for Send<T> { |
54 | | type Output = (); |
55 | | |
56 | 8 | fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
57 | 8 | if self.value.is_none() { |
58 | 4 | return Poll::Ready(()); |
59 | 4 | } |
60 | | |
61 | 4 | STORE.with(|cell| { |
62 | 4 | let ptr = cell.get() as *mut Option<T>; |
63 | 4 | let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage"); |
64 | | |
65 | 4 | if option_ref.is_none() { |
66 | 4 | *option_ref = self.value.take(); |
67 | 4 | } |
68 | | |
69 | 4 | Poll::Pending |
70 | 4 | }) <async_stream::yielder::Send<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::future::future::Future>::poll::{closure#0}Line | Count | Source | 61 | 4 | STORE.with(|cell| { | 62 | 4 | let ptr = cell.get() as *mut Option<T>; | 63 | 4 | let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage"); | 64 | | | 65 | 4 | if option_ref.is_none() { | 66 | 4 | *option_ref = self.value.take(); | 67 | 4 | } | 68 | | | 69 | 4 | Poll::Pending | 70 | 4 | }) |
Unexecuted instantiation: <async_stream::yielder::Send<_> as core::future::future::Future>::poll::{closure#0} |
71 | 8 | } <async_stream::yielder::Send<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::future::future::Future>::poll Line | Count | Source | 56 | 8 | fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { | 57 | 8 | if self.value.is_none() { | 58 | 4 | return Poll::Ready(()); | 59 | 4 | } | 60 | | | 61 | 4 | STORE.with(|cell| { | 62 | | let ptr = cell.get() as *mut Option<T>; | 63 | | let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage"); | 64 | | | 65 | | if option_ref.is_none() { | 66 | | *option_ref = self.value.take(); | 67 | | } | 68 | | | 69 | | Poll::Pending | 70 | | }) | 71 | 8 | } |
Unexecuted instantiation: <async_stream::yielder::Send<_> as core::future::future::Future>::poll |
72 | | } |
73 | | |
74 | | // ===== impl Receiver ===== |
75 | | |
76 | | impl<T> Receiver<T> { |
77 | 9 | pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> { |
78 | 9 | let prev = STORE.with(|cell| { |
79 | 9 | let prev = cell.get(); |
80 | 9 | cell.set(dst as *mut _ as *mut ()); |
81 | 9 | prev |
82 | 9 | }); <async_stream::yielder::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::enter::{closure#0}Line | Count | Source | 78 | 9 | let prev = STORE.with(|cell| { | 79 | 9 | let prev = cell.get(); | 80 | 9 | cell.set(dst as *mut _ as *mut ()); | 81 | 9 | prev | 82 | 9 | }); |
Unexecuted instantiation: <async_stream::yielder::Receiver<_>>::enter::{closure#0} |
83 | | |
84 | 9 | Enter { _rx: self, prev } |
85 | 9 | } <async_stream::yielder::Receiver<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::enter Line | Count | Source | 77 | 9 | pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> { | 78 | 9 | let prev = STORE.with(|cell| { | 79 | | let prev = cell.get(); | 80 | | cell.set(dst as *mut _ as *mut ()); | 81 | | prev | 82 | | }); | 83 | | | 84 | 9 | Enter { _rx: self, prev } | 85 | 9 | } |
Unexecuted instantiation: <async_stream::yielder::Receiver<_>>::enter |
86 | | } |
87 | | |
88 | | // ===== impl Enter ===== |
89 | | |
90 | | impl<'a, T> Drop for Enter<'a, T> { |
91 | 9 | fn drop(&mut self) { |
92 | 9 | STORE.with(|cell| cell.set(self.prev)); <async_stream::yielder::Enter<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}Line | Count | Source | 92 | 9 | STORE.with(|cell| cell.set(self.prev)); |
Unexecuted instantiation: <async_stream::yielder::Enter<_> as core::ops::drop::Drop>::drop::{closure#0} |
93 | 9 | } <async_stream::yielder::Enter<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop Line | Count | Source | 91 | 9 | fn drop(&mut self) { | 92 | 9 | STORE.with(|cell| cell.set(self.prev)); | 93 | 9 | } |
Unexecuted instantiation: <async_stream::yielder::Enter<_> as core::ops::drop::Drop>::drop |
94 | | } |