/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.6.0/src/client/dispatch.rs
Line | Count | Source |
1 | | use std::task::{Context, Poll}; |
2 | | #[cfg(feature = "http2")] |
3 | | use std::{future::Future, pin::Pin}; |
4 | | |
5 | | #[cfg(feature = "http2")] |
6 | | use http::{Request, Response}; |
7 | | #[cfg(feature = "http2")] |
8 | | use http_body::Body; |
9 | | #[cfg(feature = "http2")] |
10 | | use pin_project_lite::pin_project; |
11 | | use tokio::sync::{mpsc, oneshot}; |
12 | | |
13 | | #[cfg(feature = "http2")] |
14 | | use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; |
15 | | |
16 | | pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>; |
17 | | pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; |
18 | | |
19 | | /// An error when calling `try_send_request`. |
20 | | /// |
21 | | /// There is a possibility of an error occurring on a connection in-between the |
22 | | /// time that a request is queued and when it is actually written to the IO |
23 | | /// transport. If that happens, it is safe to return the request back to the |
24 | | /// caller, as it was never fully sent. |
25 | | #[derive(Debug)] |
26 | | pub struct TrySendError<T> { |
27 | | pub(crate) error: crate::Error, |
28 | | pub(crate) message: Option<T>, |
29 | | } |
30 | | |
31 | 0 | pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { |
32 | 0 | let (tx, rx) = mpsc::unbounded_channel(); |
33 | 0 | let (giver, taker) = want::new(); |
34 | 0 | let tx = Sender { |
35 | 0 | #[cfg(feature = "http1")] |
36 | 0 | buffered_once: false, |
37 | 0 | giver, |
38 | 0 | inner: tx, |
39 | 0 | }; |
40 | 0 | let rx = Receiver { inner: rx, taker }; |
41 | 0 | (tx, rx) |
42 | 0 | } Unexecuted instantiation: hyper::client::dispatch::channel::<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> Unexecuted instantiation: hyper::client::dispatch::channel::<_, _> |
43 | | |
44 | | /// A bounded sender of requests and callbacks for when responses are ready. |
45 | | /// |
46 | | /// While the inner sender is unbounded, the Giver is used to determine |
47 | | /// if the Receiver is ready for another request. |
48 | | pub(crate) struct Sender<T, U> { |
49 | | /// One message is always allowed, even if the Receiver hasn't asked |
50 | | /// for it yet. This boolean keeps track of whether we've sent one |
51 | | /// without notice. |
52 | | #[cfg(feature = "http1")] |
53 | | buffered_once: bool, |
54 | | /// The Giver helps watch that the Receiver side has been polled |
55 | | /// when the queue is empty. This helps us know when a request and |
56 | | /// response have been fully processed, and a connection is ready |
57 | | /// for more. |
58 | | giver: want::Giver, |
59 | | /// Actually bounded by the Giver, plus `buffered_once`. |
60 | | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
61 | | } |
62 | | |
63 | | /// An unbounded version. |
64 | | /// |
65 | | /// Cannot poll the Giver, but can still use it to determine if the Receiver |
66 | | /// has been dropped. However, this version can be cloned. |
67 | | #[cfg(feature = "http2")] |
68 | | pub(crate) struct UnboundedSender<T, U> { |
69 | | /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. |
70 | | giver: want::SharedGiver, |
71 | | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
72 | | } |
73 | | |
74 | | impl<T, U> Sender<T, U> { |
75 | | #[cfg(feature = "http1")] |
76 | 0 | pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
77 | 0 | self.giver |
78 | 0 | .poll_want(cx) |
79 | 0 | .map_err(|_| crate::Error::new_closed()) Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready::{closure#0}Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready::{closure#0} |
80 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready |
81 | | |
82 | | #[cfg(feature = "http1")] |
83 | 0 | pub(crate) fn is_ready(&self) -> bool { |
84 | 0 | self.giver.is_wanting() |
85 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_ready |
86 | | |
87 | | #[cfg(feature = "http1")] |
88 | 0 | pub(crate) fn is_closed(&self) -> bool { |
89 | 0 | self.giver.is_canceled() |
90 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_closed Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_closed |
91 | | |
92 | | #[cfg(feature = "http1")] |
93 | 0 | fn can_send(&mut self) -> bool { |
94 | 0 | if self.giver.give() || !self.buffered_once { |
95 | | // If the receiver is ready *now*, then of course we can send. |
96 | | // |
97 | | // If the receiver isn't ready yet, but we don't have anything |
98 | | // in the channel yet, then allow one message. |
99 | 0 | self.buffered_once = true; |
100 | 0 | true |
101 | | } else { |
102 | 0 | false |
103 | | } |
104 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::can_send Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::can_send |
105 | | |
106 | | #[cfg(feature = "http1")] |
107 | 0 | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
108 | 0 | if !self.can_send() { |
109 | 0 | return Err(val); |
110 | 0 | } |
111 | 0 | let (tx, rx) = oneshot::channel(); |
112 | 0 | self.inner |
113 | 0 | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
114 | 0 | .map(move |_| rx) |
115 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send::{closure#1} |
116 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send |
117 | | |
118 | | #[cfg(feature = "http1")] |
119 | 0 | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
120 | 0 | if !self.can_send() { |
121 | 0 | return Err(val); |
122 | 0 | } |
123 | 0 | let (tx, rx) = oneshot::channel(); |
124 | 0 | self.inner |
125 | 0 | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
126 | 0 | .map(move |_| rx) |
127 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) |
128 | 0 | } |
129 | | |
130 | | #[cfg(feature = "http2")] |
131 | 0 | pub(crate) fn unbound(self) -> UnboundedSender<T, U> { |
132 | 0 | UnboundedSender { |
133 | 0 | giver: self.giver.shared(), |
134 | 0 | inner: self.inner, |
135 | 0 | } |
136 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::unbound Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::unbound |
137 | | } |
138 | | |
139 | | #[cfg(feature = "http2")] |
140 | | impl<T, U> UnboundedSender<T, U> { |
141 | 0 | pub(crate) fn is_ready(&self) -> bool { |
142 | 0 | !self.giver.is_canceled() |
143 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::is_ready |
144 | | |
145 | 0 | pub(crate) fn is_closed(&self) -> bool { |
146 | 0 | self.giver.is_canceled() |
147 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_closed Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::is_closed |
148 | | |
149 | 0 | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
150 | 0 | let (tx, rx) = oneshot::channel(); |
151 | 0 | self.inner |
152 | 0 | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
153 | 0 | .map(move |_| rx) |
154 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::try_send::{closure#1} |
155 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::try_send |
156 | | |
157 | 0 | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
158 | 0 | let (tx, rx) = oneshot::channel(); |
159 | 0 | self.inner |
160 | 0 | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
161 | 0 | .map(move |_| rx) |
162 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) |
163 | 0 | } |
164 | | } |
165 | | |
166 | | #[cfg(feature = "http2")] |
167 | | impl<T, U> Clone for UnboundedSender<T, U> { |
168 | 0 | fn clone(&self) -> Self { |
169 | 0 | UnboundedSender { |
170 | 0 | giver: self.giver.clone(), |
171 | 0 | inner: self.inner.clone(), |
172 | 0 | } |
173 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::clone::Clone>::clone Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _> as core::clone::Clone>::clone |
174 | | } |
175 | | |
176 | | pub(crate) struct Receiver<T, U> { |
177 | | inner: mpsc::UnboundedReceiver<Envelope<T, U>>, |
178 | | taker: want::Taker, |
179 | | } |
180 | | |
181 | | impl<T, U> Receiver<T, U> { |
182 | 0 | pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> { |
183 | 0 | match self.inner.poll_recv(cx) { |
184 | 0 | Poll::Ready(item) => { |
185 | 0 | Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv::{closure#0}Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv::{closure#0} |
186 | | } |
187 | | Poll::Pending => { |
188 | 0 | self.taker.want(); |
189 | 0 | Poll::Pending |
190 | | } |
191 | | } |
192 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv |
193 | | |
194 | | #[cfg(feature = "http1")] |
195 | 0 | pub(crate) fn close(&mut self) { |
196 | 0 | self.taker.cancel(); |
197 | 0 | self.inner.close(); |
198 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::close Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::close |
199 | | |
200 | | #[cfg(feature = "http1")] |
201 | 0 | pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { |
202 | | use futures_util::FutureExt; |
203 | 0 | match self.inner.recv().now_or_never() { |
204 | 0 | Some(Some(mut env)) => env.0.take(), |
205 | 0 | _ => None, |
206 | | } |
207 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_recv Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::try_recv |
208 | | } |
209 | | |
210 | | impl<T, U> Drop for Receiver<T, U> { |
211 | 0 | fn drop(&mut self) { |
212 | | // Notify the giver about the closure first, before dropping |
213 | | // the mpsc::Receiver. |
214 | 0 | self.taker.cancel(); |
215 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _> as core::ops::drop::Drop>::drop |
216 | | } |
217 | | |
218 | | struct Envelope<T, U>(Option<(T, Callback<T, U>)>); |
219 | | |
220 | | impl<T, U> Drop for Envelope<T, U> { |
221 | 0 | fn drop(&mut self) { |
222 | 0 | if let Some((val, cb)) = self.0.take() { |
223 | 0 | cb.send(Err(TrySendError { |
224 | 0 | error: crate::Error::new_canceled().with("connection closed"), |
225 | 0 | message: Some(val), |
226 | 0 | })); |
227 | 0 | } |
228 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Envelope<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Envelope<_, _> as core::ops::drop::Drop>::drop |
229 | | } |
230 | | |
231 | | pub(crate) enum Callback<T, U> { |
232 | | #[allow(unused)] |
233 | | Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>), |
234 | | NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), |
235 | | } |
236 | | |
237 | | impl<T, U> Drop for Callback<T, U> { |
238 | 0 | fn drop(&mut self) { |
239 | 0 | match self { |
240 | 0 | Callback::Retry(tx) => { |
241 | 0 | if let Some(tx) = tx.take() { |
242 | 0 | let _ = tx.send(Err(TrySendError { |
243 | 0 | error: dispatch_gone(), |
244 | 0 | message: None, |
245 | 0 | })); |
246 | 0 | } |
247 | | } |
248 | 0 | Callback::NoRetry(tx) => { |
249 | 0 | if let Some(tx) = tx.take() { |
250 | 0 | let _ = tx.send(Err(dispatch_gone())); |
251 | 0 | } |
252 | | } |
253 | | } |
254 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _> as core::ops::drop::Drop>::drop |
255 | | } |
256 | | |
257 | | #[cold] |
258 | 0 | fn dispatch_gone() -> crate::Error { |
259 | | // FIXME(nox): What errors do we want here? |
260 | 0 | crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() { |
261 | 0 | "user code panicked" |
262 | | } else { |
263 | 0 | "runtime dropped the dispatch task" |
264 | | }) |
265 | 0 | } |
266 | | |
267 | | impl<T, U> Callback<T, U> { |
268 | | #[cfg(feature = "http2")] |
269 | 0 | pub(crate) fn is_canceled(&self) -> bool { |
270 | 0 | match *self { |
271 | 0 | Callback::Retry(Some(ref tx)) => tx.is_closed(), |
272 | 0 | Callback::NoRetry(Some(ref tx)) => tx.is_closed(), |
273 | 0 | _ => unreachable!(), |
274 | | } |
275 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_canceled Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::is_canceled |
276 | | |
277 | 0 | pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
278 | 0 | match *self { |
279 | 0 | Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), |
280 | 0 | Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), |
281 | 0 | _ => unreachable!(), |
282 | | } |
283 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_canceled Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::poll_canceled |
284 | | |
285 | 0 | pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) { |
286 | 0 | match self { |
287 | 0 | Callback::Retry(ref mut tx) => { |
288 | 0 | let _ = tx.take().unwrap().send(val); |
289 | 0 | } |
290 | 0 | Callback::NoRetry(ref mut tx) => { |
291 | 0 | let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); |
292 | | } |
293 | | } |
294 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::send Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::send |
295 | | } |
296 | | |
297 | | impl<T> TrySendError<T> { |
298 | | /// Take the message from this error. |
299 | | /// |
300 | | /// The message will not always have been recovered. If an error occurs |
301 | | /// after the message has been serialized onto the connection, it will not |
302 | | /// be available here. |
303 | 0 | pub fn take_message(&mut self) -> Option<T> { |
304 | 0 | self.message.take() |
305 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<tonic::body::Body>>>::take_message Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::take_message |
306 | | |
307 | | /// Consumes this to return the inner error. |
308 | 0 | pub fn into_error(self) -> crate::Error { |
309 | 0 | self.error |
310 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<tonic::body::Body>>>::into_error Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::into_error |
311 | | } |
312 | | |
313 | | #[cfg(feature = "http2")] |
314 | | pin_project! { |
315 | | pub struct SendWhen<B> |
316 | | where |
317 | | B: Body, |
318 | | B: 'static, |
319 | | { |
320 | | #[pin] |
321 | | pub(crate) when: ResponseFutMap<B>, |
322 | | #[pin] |
323 | | pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>, |
324 | | } |
325 | | } |
326 | | |
327 | | #[cfg(feature = "http2")] |
328 | | impl<B> Future for SendWhen<B> |
329 | | where |
330 | | B: Body + 'static, |
331 | | { |
332 | | type Output = (); |
333 | | |
334 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
335 | 0 | let mut this = self.project(); |
336 | | |
337 | 0 | let mut call_back = this.call_back.take().expect("polled after complete"); |
338 | | |
339 | 0 | match Pin::new(&mut this.when).poll(cx) { |
340 | 0 | Poll::Ready(Ok(res)) => { |
341 | 0 | call_back.send(Ok(res)); |
342 | 0 | Poll::Ready(()) |
343 | | } |
344 | | Poll::Pending => { |
345 | | // check if the callback is canceled |
346 | 0 | match call_back.poll_canceled(cx) { |
347 | 0 | Poll::Ready(v) => v, |
348 | | Poll::Pending => { |
349 | | // Move call_back back to struct before return |
350 | 0 | this.call_back.set(Some(call_back)); |
351 | 0 | return Poll::Pending; |
352 | | } |
353 | | }; |
354 | | trace!("send_when canceled"); |
355 | 0 | Poll::Ready(()) |
356 | | } |
357 | 0 | Poll::Ready(Err((error, message))) => { |
358 | 0 | call_back.send(Err(TrySendError { error, message })); |
359 | 0 | Poll::Ready(()) |
360 | | } |
361 | | } |
362 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::SendWhen<tonic::body::Body> as core::future::future::Future>::poll Unexecuted instantiation: <hyper::client::dispatch::SendWhen<_> as core::future::future::Future>::poll |
363 | | } |
364 | | |
365 | | #[cfg(test)] |
366 | | mod tests { |
367 | | #[cfg(feature = "nightly")] |
368 | | extern crate test; |
369 | | |
370 | | use std::future::Future; |
371 | | use std::pin::Pin; |
372 | | use std::task::{Context, Poll}; |
373 | | |
374 | | use super::{channel, Callback, Receiver}; |
375 | | |
376 | | #[derive(Debug)] |
377 | | struct Custom(#[allow(dead_code)] i32); |
378 | | |
379 | | impl<T, U> Future for Receiver<T, U> { |
380 | | type Output = Option<(T, Callback<T, U>)>; |
381 | | |
382 | | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
383 | | self.poll_recv(cx) |
384 | | } |
385 | | } |
386 | | |
387 | | /// Helper to check if the future is ready after polling once. |
388 | | struct PollOnce<'a, F>(&'a mut F); |
389 | | |
390 | | impl<F, T> Future for PollOnce<'_, F> |
391 | | where |
392 | | F: Future<Output = T> + Unpin, |
393 | | { |
394 | | type Output = Option<()>; |
395 | | |
396 | | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
397 | | match Pin::new(&mut self.0).poll(cx) { |
398 | | Poll::Ready(_) => Poll::Ready(Some(())), |
399 | | Poll::Pending => Poll::Ready(None), |
400 | | } |
401 | | } |
402 | | } |
403 | | |
404 | | #[cfg(not(miri))] |
405 | | #[tokio::test] |
406 | | async fn drop_receiver_sends_cancel_errors() { |
407 | | let _ = pretty_env_logger::try_init(); |
408 | | |
409 | | let (mut tx, mut rx) = channel::<Custom, ()>(); |
410 | | |
411 | | // must poll once for try_send to succeed |
412 | | assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); |
413 | | |
414 | | let promise = tx.try_send(Custom(43)).unwrap(); |
415 | | drop(rx); |
416 | | |
417 | | let fulfilled = promise.await; |
418 | | let err = fulfilled |
419 | | .expect("fulfilled") |
420 | | .expect_err("promise should error"); |
421 | | match (err.error.is_canceled(), err.message) { |
422 | | (true, Some(_)) => (), |
423 | | e => panic!("expected Error::Cancel(_), found {:?}", e), |
424 | | } |
425 | | } |
426 | | |
427 | | #[cfg(not(miri))] |
428 | | #[tokio::test] |
429 | | async fn sender_checks_for_want_on_send() { |
430 | | let (mut tx, mut rx) = channel::<Custom, ()>(); |
431 | | |
432 | | // one is allowed to buffer, second is rejected |
433 | | let _ = tx.try_send(Custom(1)).expect("1 buffered"); |
434 | | tx.try_send(Custom(2)).expect_err("2 not ready"); |
435 | | |
436 | | assert!(PollOnce(&mut rx).await.is_some(), "rx once"); |
437 | | |
438 | | // Even though 1 has been popped, only 1 could be buffered for the |
439 | | // lifetime of the channel. |
440 | | tx.try_send(Custom(2)).expect_err("2 still not ready"); |
441 | | |
442 | | assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); |
443 | | |
444 | | let _ = tx.try_send(Custom(2)).expect("2 ready"); |
445 | | } |
446 | | |
447 | | #[cfg(feature = "http2")] |
448 | | #[test] |
449 | | fn unbounded_sender_doesnt_bound_on_want() { |
450 | | let (tx, rx) = channel::<Custom, ()>(); |
451 | | let mut tx = tx.unbound(); |
452 | | |
453 | | let _ = tx.try_send(Custom(1)).unwrap(); |
454 | | let _ = tx.try_send(Custom(2)).unwrap(); |
455 | | let _ = tx.try_send(Custom(3)).unwrap(); |
456 | | |
457 | | drop(rx); |
458 | | |
459 | | let _ = tx.try_send(Custom(4)).unwrap_err(); |
460 | | } |
461 | | |
462 | | #[cfg(feature = "nightly")] |
463 | | #[bench] |
464 | | fn giver_queue_throughput(b: &mut test::Bencher) { |
465 | | use crate::{body::Incoming, Request, Response}; |
466 | | |
467 | | let rt = tokio::runtime::Builder::new_current_thread() |
468 | | .build() |
469 | | .unwrap(); |
470 | | let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>(); |
471 | | |
472 | | b.iter(move || { |
473 | | let _ = tx.send(Request::new(Incoming::empty())).unwrap(); |
474 | | rt.block_on(async { |
475 | | loop { |
476 | | let poll_once = PollOnce(&mut rx); |
477 | | let opt = poll_once.await; |
478 | | if opt.is_none() { |
479 | | break; |
480 | | } |
481 | | } |
482 | | }); |
483 | | }) |
484 | | } |
485 | | |
486 | | #[cfg(feature = "nightly")] |
487 | | #[bench] |
488 | | fn giver_queue_not_ready(b: &mut test::Bencher) { |
489 | | let rt = tokio::runtime::Builder::new_current_thread() |
490 | | .build() |
491 | | .unwrap(); |
492 | | let (_tx, mut rx) = channel::<i32, ()>(); |
493 | | b.iter(move || { |
494 | | rt.block_on(async { |
495 | | let poll_once = PollOnce(&mut rx); |
496 | | assert!(poll_once.await.is_none()); |
497 | | }); |
498 | | }) |
499 | | } |
500 | | |
501 | | #[cfg(feature = "nightly")] |
502 | | #[bench] |
503 | | fn giver_queue_cancel(b: &mut test::Bencher) { |
504 | | let (_tx, mut rx) = channel::<i32, ()>(); |
505 | | |
506 | | b.iter(move || { |
507 | | rx.taker.cancel(); |
508 | | }) |
509 | | } |
510 | | } |