/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/macros/join.rs
Line | Count | Source (jump to first uncovered line) |
1 | | macro_rules! doc { |
2 | | ($join:item) => { |
3 | | /// Waits on multiple concurrent branches, returning when **all** branches |
4 | | /// complete. |
5 | | /// |
6 | | /// The `join!` macro must be used inside of async functions, closures, and |
7 | | /// blocks. |
8 | | /// |
9 | | /// The `join!` macro takes a list of async expressions and evaluates them |
10 | | /// concurrently on the same task. Each async expression evaluates to a future |
11 | | /// and the futures from each expression are multiplexed on the current task. |
12 | | /// |
13 | | /// When working with async expressions returning `Result`, `join!` will wait |
14 | | /// for **all** branches complete regardless if any complete with `Err`. Use |
15 | | /// [`try_join!`] to return early when `Err` is encountered. |
16 | | /// |
17 | | /// [`try_join!`]: crate::try_join |
18 | | /// |
19 | | /// # Notes |
20 | | /// |
21 | | /// The supplied futures are stored inline and do not require allocating a |
22 | | /// `Vec`. |
23 | | /// |
24 | | /// ### Runtime characteristics |
25 | | /// |
26 | | /// By running all async expressions on the current task, the expressions are |
27 | | /// able to run **concurrently** but not in **parallel**. This means all |
28 | | /// expressions are run on the same thread and if one branch blocks the thread, |
29 | | /// all other expressions will be unable to continue. If parallelism is |
30 | | /// required, spawn each async expression using [`tokio::spawn`] and pass the |
31 | | /// join handle to `join!`. |
32 | | /// |
33 | | /// [`tokio::spawn`]: crate::spawn |
34 | | /// |
35 | | /// # Examples |
36 | | /// |
37 | | /// Basic join with two branches |
38 | | /// |
39 | | /// ``` |
40 | | /// async fn do_stuff_async() { |
41 | | /// // async work |
42 | | /// } |
43 | | /// |
44 | | /// async fn more_async_work() { |
45 | | /// // more here |
46 | | /// } |
47 | | /// |
48 | | /// #[tokio::main] |
49 | | /// async fn main() { |
50 | | /// let (first, second) = tokio::join!( |
51 | | /// do_stuff_async(), |
52 | | /// more_async_work()); |
53 | | /// |
54 | | /// // do something with the values |
55 | | /// } |
56 | | /// ``` |
57 | | #[macro_export] |
58 | | #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] |
59 | | $join |
60 | | }; |
61 | | } |
62 | | |
63 | | #[cfg(doc)] |
64 | | doc! {macro_rules! join { |
65 | | ($($future:expr),*) => { unimplemented!() } |
66 | | }} |
67 | | |
68 | | #[cfg(not(doc))] |
69 | | doc! {macro_rules! join { |
70 | | (@ { |
71 | | // One `_` for each branch in the `join!` macro. This is not used once |
72 | | // normalization is complete. |
73 | | ( $($count:tt)* ) |
74 | | |
75 | | // The expression `0+1+1+ ... +1` equal to the number of branches. |
76 | | ( $($total:tt)* ) |
77 | | |
78 | | // Normalized join! branches |
79 | | $( ( $($skip:tt)* ) $e:expr, )* |
80 | | |
81 | | }) => {{ |
82 | | use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; |
83 | | use $crate::macros::support::Poll::{Ready, Pending}; |
84 | | |
85 | | // Safety: nothing must be moved out of `futures`. This is to satisfy |
86 | | // the requirement of `Pin::new_unchecked` called below. |
87 | | // |
88 | | // We can't use the `pin!` macro for this because `futures` is a tuple |
89 | | // and the standard library provides no way to pin-project to the fields |
90 | | // of a tuple. |
91 | | let mut futures = ( $( maybe_done($e), )* ); |
92 | | |
93 | | // This assignment makes sure that the `poll_fn` closure only has a |
94 | | // reference to the futures, instead of taking ownership of them. This |
95 | | // mitigates the issue described in |
96 | | // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> |
97 | | let mut futures = &mut futures; |
98 | | |
99 | | // Each time the future created by poll_fn is polled, a different future will be polled first |
100 | | // to ensure every future passed to join! gets a chance to make progress even if |
101 | | // one of the futures consumes the whole budget. |
102 | | // |
103 | | // This is number of futures that will be skipped in the first loop |
104 | | // iteration the next time. |
105 | | let mut skip_next_time: u32 = 0; |
106 | | |
107 | 0 | poll_fn(move |cx| { |
108 | 0 | const COUNT: u32 = $($total)*; |
109 | 0 |
|
110 | 0 | let mut is_pending = false; |
111 | 0 |
|
112 | 0 | let mut to_run = COUNT; |
113 | 0 |
|
114 | 0 | // The number of futures that will be skipped in the first loop iteration. |
115 | 0 | let mut skip = skip_next_time; |
116 | 0 |
|
117 | 0 | skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; |
118 | | |
119 | | // This loop runs twice and the first `skip` futures |
120 | | // are not polled in the first iteration. |
121 | | loop { |
122 | | $( |
123 | 0 | if skip == 0 { |
124 | 0 | if to_run == 0 { |
125 | | // Every future has been polled |
126 | 0 | break; |
127 | 0 | } |
128 | 0 | to_run -= 1; |
129 | 0 |
|
130 | 0 | // Extract the future for this branch from the tuple. |
131 | 0 | let ( $($skip,)* fut, .. ) = &mut *futures; |
132 | 0 |
|
133 | 0 | // Safety: future is stored on the stack above |
134 | 0 | // and never moved. |
135 | 0 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
136 | 0 |
|
137 | 0 | // Try polling |
138 | 0 | if fut.poll(cx).is_pending() { |
139 | 0 | is_pending = true; |
140 | 0 | } |
141 | 0 | } else { |
142 | 0 | // Future skipped, one less future to skip in the next iteration |
143 | 0 | skip -= 1; |
144 | 0 | } |
145 | | )* |
146 | | } |
147 | | |
148 | 0 | if is_pending { |
149 | 0 | Pending |
150 | | } else { |
151 | 0 | Ready(($({ |
152 | 0 | // Extract the future for this branch from the tuple. |
153 | 0 | let ( $($skip,)* fut, .. ) = &mut futures; |
154 | 0 |
|
155 | 0 | // Safety: future is stored on the stack above |
156 | 0 | // and never moved. |
157 | 0 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
158 | 0 |
|
159 | 0 | fut.take_output().expect("expected completed future") |
160 | 0 | },)*)) |
161 | | } |
162 | | }).await Unexecuted instantiation: ztunnel::copy::copy_bidirectional::<ztunnel::copy::TcpStreamSplitter, ztunnel::copy::TcpStreamSplitter>::{closure#0}::{closure#2} Unexecuted instantiation: ztunnel::copy::copy_bidirectional::<ztunnel::copy::TcpStreamSplitter, ztunnel::proxy::h2::H2Stream>::{closure#0}::{closure#2} Unexecuted instantiation: ztunnel::copy::copy_bidirectional::<ztunnel::proxy::h2::H2Stream, ztunnel::copy::TcpStreamSplitter>::{closure#0}::{closure#2} |
163 | | }}; |
164 | | |
165 | | // ===== Normalize ===== |
166 | | |
167 | | (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { |
168 | | $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) |
169 | | }; |
170 | | |
171 | | // ===== Entry point ===== |
172 | | |
173 | | ( $($e:expr),+ $(,)?) => { |
174 | | $crate::join!(@{ () (0) } $($e,)*) |
175 | | }; |
176 | | |
177 | | () => { async {}.await } |
178 | | }} |