/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.8.1/src/client/dispatch.rs
Line | Count | Source |
1 | | use std::task::{Context, Poll}; |
2 | | #[cfg(feature = "http2")] |
3 | | use std::{future::Future, pin::Pin}; |
4 | | |
5 | | #[cfg(feature = "http2")] |
6 | | use http::{Request, Response}; |
7 | | #[cfg(feature = "http2")] |
8 | | use http_body::Body; |
9 | | #[cfg(feature = "http2")] |
10 | | use pin_project_lite::pin_project; |
11 | | use tokio::sync::{mpsc, oneshot}; |
12 | | |
13 | | #[cfg(feature = "http2")] |
14 | | use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; |
15 | | |
16 | | pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>; |
17 | | pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; |
18 | | |
19 | | /// An error when calling `try_send_request`. |
20 | | /// |
21 | | /// There is a possibility of an error occurring on a connection in-between the |
22 | | /// time that a request is queued and when it is actually written to the IO |
23 | | /// transport. If that happens, it is safe to return the request back to the |
24 | | /// caller, as it was never fully sent. |
25 | | #[derive(Debug)] |
26 | | pub struct TrySendError<T> { |
27 | | pub(crate) error: crate::Error, |
28 | | pub(crate) message: Option<T>, |
29 | | } |
30 | | |
31 | 0 | pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { |
32 | 0 | let (tx, rx) = mpsc::unbounded_channel(); |
33 | 0 | let (giver, taker) = want::new(); |
34 | 0 | let tx = Sender { |
35 | 0 | #[cfg(feature = "http1")] |
36 | 0 | buffered_once: false, |
37 | 0 | giver, |
38 | 0 | inner: tx, |
39 | 0 | }; |
40 | 0 | let rx = Receiver { inner: rx, taker }; |
41 | 0 | (tx, rx) |
42 | 0 | } Unexecuted instantiation: hyper::client::dispatch::channel::<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> Unexecuted instantiation: hyper::client::dispatch::channel::<_, _> |
43 | | |
44 | | /// A bounded sender of requests and callbacks for when responses are ready. |
45 | | /// |
46 | | /// While the inner sender is unbounded, the Giver is used to determine |
47 | | /// if the Receiver is ready for another request. |
48 | | pub(crate) struct Sender<T, U> { |
49 | | /// One message is always allowed, even if the Receiver hasn't asked |
50 | | /// for it yet. This boolean keeps track of whether we've sent one |
51 | | /// without notice. |
52 | | #[cfg(feature = "http1")] |
53 | | buffered_once: bool, |
54 | | /// The Giver helps watch that the Receiver side has been polled |
55 | | /// when the queue is empty. This helps us know when a request and |
56 | | /// response have been fully processed, and a connection is ready |
57 | | /// for more. |
58 | | giver: want::Giver, |
59 | | /// Actually bounded by the Giver, plus `buffered_once`. |
60 | | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
61 | | } |
62 | | |
63 | | /// An unbounded version. |
64 | | /// |
65 | | /// Cannot poll the Giver, but can still use it to determine if the Receiver |
66 | | /// has been dropped. However, this version can be cloned. |
67 | | #[cfg(feature = "http2")] |
68 | | pub(crate) struct UnboundedSender<T, U> { |
69 | | /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. |
70 | | giver: want::SharedGiver, |
71 | | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
72 | | } |
73 | | |
74 | | impl<T, U> Sender<T, U> { |
75 | | #[cfg(feature = "http1")] |
76 | 0 | pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
77 | 0 | self.giver |
78 | 0 | .poll_want(cx) |
79 | 0 | .map_err(|_| crate::Error::new_closed()) Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready::{closure#0}Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready::{closure#0} |
80 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready |
81 | | |
82 | | #[cfg(feature = "http1")] |
83 | 0 | pub(crate) fn is_ready(&self) -> bool { |
84 | 0 | self.giver.is_wanting() |
85 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_ready |
86 | | |
87 | | #[cfg(feature = "http1")] |
88 | 0 | pub(crate) fn is_closed(&self) -> bool { |
89 | 0 | self.giver.is_canceled() |
90 | 0 | } |
91 | | |
92 | | #[cfg(feature = "http1")] |
93 | 0 | fn can_send(&mut self) -> bool { |
94 | 0 | if self.giver.give() || !self.buffered_once { |
95 | | // If the receiver is ready *now*, then of course we can send. |
96 | | // |
97 | | // If the receiver isn't ready yet, but we don't have anything |
98 | | // in the channel yet, then allow one message. |
99 | 0 | self.buffered_once = true; |
100 | 0 | true |
101 | | } else { |
102 | 0 | false |
103 | | } |
104 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::can_send Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::can_send |
105 | | |
106 | | #[cfg(feature = "http1")] |
107 | 0 | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
108 | 0 | if !self.can_send() { |
109 | 0 | return Err(val); |
110 | 0 | } |
111 | 0 | let (tx, rx) = oneshot::channel(); |
112 | 0 | self.inner |
113 | 0 | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
114 | 0 | .map(move |_| rx) |
115 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send::{closure#1} |
116 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send |
117 | | |
118 | | #[cfg(feature = "http1")] |
119 | 0 | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
120 | 0 | if !self.can_send() { |
121 | 0 | return Err(val); |
122 | 0 | } |
123 | 0 | let (tx, rx) = oneshot::channel(); |
124 | 0 | self.inner |
125 | 0 | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
126 | 0 | .map(move |_| rx) |
127 | 0 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) |
128 | 0 | } |
129 | | |
130 | | #[cfg(feature = "http2")] |
131 | | pub(crate) fn unbound(self) -> UnboundedSender<T, U> { |
132 | | UnboundedSender { |
133 | | giver: self.giver.shared(), |
134 | | inner: self.inner, |
135 | | } |
136 | | } |
137 | | } |
138 | | |
139 | | #[cfg(feature = "http2")] |
140 | | impl<T, U> UnboundedSender<T, U> { |
141 | | pub(crate) fn is_ready(&self) -> bool { |
142 | | !self.giver.is_canceled() |
143 | | } |
144 | | |
145 | | pub(crate) fn is_closed(&self) -> bool { |
146 | | self.giver.is_canceled() |
147 | | } |
148 | | |
149 | | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
150 | | let (tx, rx) = oneshot::channel(); |
151 | | self.inner |
152 | | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
153 | | .map(move |_| rx) |
154 | | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) |
155 | | } |
156 | | |
157 | | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
158 | | let (tx, rx) = oneshot::channel(); |
159 | | self.inner |
160 | | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
161 | | .map(move |_| rx) |
162 | | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) |
163 | | } |
164 | | } |
165 | | |
166 | | #[cfg(feature = "http2")] |
167 | | impl<T, U> Clone for UnboundedSender<T, U> { |
168 | | fn clone(&self) -> Self { |
169 | | UnboundedSender { |
170 | | giver: self.giver.clone(), |
171 | | inner: self.inner.clone(), |
172 | | } |
173 | | } |
174 | | } |
175 | | |
176 | | pub(crate) struct Receiver<T, U> { |
177 | | inner: mpsc::UnboundedReceiver<Envelope<T, U>>, |
178 | | taker: want::Taker, |
179 | | } |
180 | | |
181 | | impl<T, U> Receiver<T, U> { |
182 | 0 | pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> { |
183 | 0 | match self.inner.poll_recv(cx) { |
184 | 0 | Poll::Ready(item) => { |
185 | 0 | Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv::{closure#0}Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv::{closure#0} |
186 | | } |
187 | | Poll::Pending => { |
188 | 0 | self.taker.want(); |
189 | 0 | Poll::Pending |
190 | | } |
191 | | } |
192 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv |
193 | | |
194 | | #[cfg(feature = "http1")] |
195 | 0 | pub(crate) fn close(&mut self) { |
196 | 0 | self.taker.cancel(); |
197 | 0 | self.inner.close(); |
198 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::close Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::close |
199 | | |
200 | | #[cfg(feature = "http1")] |
201 | 0 | pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { |
202 | 0 | match crate::common::task::now_or_never(self.inner.recv()) { |
203 | 0 | Some(Some(mut env)) => env.0.take(), |
204 | 0 | _ => None, |
205 | | } |
206 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_recv Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::try_recv |
207 | | } |
208 | | |
209 | | impl<T, U> Drop for Receiver<T, U> { |
210 | 0 | fn drop(&mut self) { |
211 | | // Notify the giver about the closure first, before dropping |
212 | | // the mpsc::Receiver. |
213 | 0 | self.taker.cancel(); |
214 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _> as core::ops::drop::Drop>::drop |
215 | | } |
216 | | |
217 | | struct Envelope<T, U>(Option<(T, Callback<T, U>)>); |
218 | | |
219 | | impl<T, U> Drop for Envelope<T, U> { |
220 | 0 | fn drop(&mut self) { |
221 | 0 | if let Some((val, cb)) = self.0.take() { |
222 | 0 | cb.send(Err(TrySendError { |
223 | 0 | error: crate::Error::new_canceled().with("connection closed"), |
224 | 0 | message: Some(val), |
225 | 0 | })); |
226 | 0 | } |
227 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Envelope<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Envelope<_, _> as core::ops::drop::Drop>::drop |
228 | | } |
229 | | |
230 | | pub(crate) enum Callback<T, U> { |
231 | | #[allow(unused)] |
232 | | Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>), |
233 | | NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), |
234 | | } |
235 | | |
236 | | impl<T, U> Drop for Callback<T, U> { |
237 | 0 | fn drop(&mut self) { |
238 | 0 | match self { |
239 | 0 | Callback::Retry(tx) => { |
240 | 0 | if let Some(tx) = tx.take() { |
241 | 0 | let _ = tx.send(Err(TrySendError { |
242 | 0 | error: dispatch_gone(), |
243 | 0 | message: None, |
244 | 0 | })); |
245 | 0 | } |
246 | | } |
247 | 0 | Callback::NoRetry(tx) => { |
248 | 0 | if let Some(tx) = tx.take() { |
249 | 0 | let _ = tx.send(Err(dispatch_gone())); |
250 | 0 | } |
251 | | } |
252 | | } |
253 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _> as core::ops::drop::Drop>::drop |
254 | | } |
255 | | |
256 | | #[cold] |
257 | 0 | fn dispatch_gone() -> crate::Error { |
258 | | // FIXME(nox): What errors do we want here? |
259 | 0 | crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() { |
260 | 0 | "user code panicked" |
261 | | } else { |
262 | 0 | "runtime dropped the dispatch task" |
263 | | }) |
264 | 0 | } |
265 | | |
266 | | impl<T, U> Callback<T, U> { |
267 | | #[cfg(feature = "http2")] |
268 | | pub(crate) fn is_canceled(&self) -> bool { |
269 | | match *self { |
270 | | Callback::Retry(Some(ref tx)) => tx.is_closed(), |
271 | | Callback::NoRetry(Some(ref tx)) => tx.is_closed(), |
272 | | _ => unreachable!(), |
273 | | } |
274 | | } |
275 | | |
276 | 0 | pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
277 | 0 | match *self { |
278 | 0 | Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), |
279 | 0 | Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), |
280 | 0 | _ => unreachable!(), |
281 | | } |
282 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_canceled Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::poll_canceled |
283 | | |
284 | 0 | pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) { |
285 | 0 | match self { |
286 | 0 | Callback::Retry(ref mut tx) => { |
287 | 0 | let _ = tx.take().unwrap().send(val); |
288 | 0 | } |
289 | 0 | Callback::NoRetry(ref mut tx) => { |
290 | 0 | let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); |
291 | | } |
292 | | } |
293 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::send Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::send |
294 | | } |
295 | | |
296 | | impl<T> TrySendError<T> { |
297 | | /// Take the message from this error. |
298 | | /// |
299 | | /// The message will not always have been recovered. If an error occurs |
300 | | /// after the message has been serialized onto the connection, it will not |
301 | | /// be available here. |
302 | 0 | pub fn take_message(&mut self) -> Option<T> { |
303 | 0 | self.message.take() |
304 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<reqwest::async_impl::body::Body>>>::take_message Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::take_message |
305 | | |
306 | | /// Returns a reference to the recovered message. |
307 | | /// |
308 | | /// The message will not always have been recovered. If an error occurs |
309 | | /// after the message has been serialized onto the connection, it will not |
310 | | /// be available here. |
311 | 0 | pub fn message(&self) -> Option<&T> { |
312 | 0 | self.message.as_ref() |
313 | 0 | } |
314 | | |
315 | | /// Consumes this to return the inner error. |
316 | 0 | pub fn into_error(self) -> crate::Error { |
317 | 0 | self.error |
318 | 0 | } Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<reqwest::async_impl::body::Body>>>::into_error Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::into_error |
319 | | |
320 | | /// Returns a reference to the inner error. |
321 | 0 | pub fn error(&self) -> &crate::Error { |
322 | 0 | &self.error |
323 | 0 | } |
324 | | } |
325 | | |
326 | | #[cfg(feature = "http2")] |
327 | | pin_project! { |
328 | | pub struct SendWhen<B, E> |
329 | | where |
330 | | B: Body, |
331 | | B: 'static, |
332 | | { |
333 | | #[pin] |
334 | | pub(crate) when: ResponseFutMap<B, E>, |
335 | | #[pin] |
336 | | pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>, |
337 | | } |
338 | | } |
339 | | |
340 | | #[cfg(feature = "http2")] |
341 | | impl<B, E> Future for SendWhen<B, E> |
342 | | where |
343 | | B: Body + 'static, |
344 | | E: crate::rt::bounds::Http2UpgradedExec<B::Data>, |
345 | | { |
346 | | type Output = (); |
347 | | |
348 | | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
349 | | let mut this = self.project(); |
350 | | |
351 | | let mut call_back = this.call_back.take().expect("polled after complete"); |
352 | | |
353 | | match Pin::new(&mut this.when).poll(cx) { |
354 | | Poll::Ready(Ok(res)) => { |
355 | | call_back.send(Ok(res)); |
356 | | Poll::Ready(()) |
357 | | } |
358 | | Poll::Pending => { |
359 | | // check if the callback is canceled |
360 | | match call_back.poll_canceled(cx) { |
361 | | Poll::Ready(v) => v, |
362 | | Poll::Pending => { |
363 | | // Move call_back back to struct before return |
364 | | this.call_back.set(Some(call_back)); |
365 | | return Poll::Pending; |
366 | | } |
367 | | }; |
368 | | trace!("send_when canceled"); |
369 | | Poll::Ready(()) |
370 | | } |
371 | | Poll::Ready(Err((error, message))) => { |
372 | | call_back.send(Err(TrySendError { error, message })); |
373 | | Poll::Ready(()) |
374 | | } |
375 | | } |
376 | | } |
377 | | } |
378 | | |
379 | | #[cfg(test)] |
380 | | mod tests { |
381 | | #[cfg(feature = "nightly")] |
382 | | extern crate test; |
383 | | |
384 | | use std::future::Future; |
385 | | use std::pin::Pin; |
386 | | use std::task::{Context, Poll}; |
387 | | |
388 | | use super::{channel, Callback, Receiver}; |
389 | | |
390 | | #[derive(Debug)] |
391 | | struct Custom(#[allow(dead_code)] i32); |
392 | | |
393 | | impl<T, U> Future for Receiver<T, U> { |
394 | | type Output = Option<(T, Callback<T, U>)>; |
395 | | |
396 | | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
397 | | self.poll_recv(cx) |
398 | | } |
399 | | } |
400 | | |
401 | | /// Helper to check if the future is ready after polling once. |
402 | | struct PollOnce<'a, F>(&'a mut F); |
403 | | |
404 | | impl<F, T> Future for PollOnce<'_, F> |
405 | | where |
406 | | F: Future<Output = T> + Unpin, |
407 | | { |
408 | | type Output = Option<()>; |
409 | | |
410 | | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
411 | | match Pin::new(&mut self.0).poll(cx) { |
412 | | Poll::Ready(_) => Poll::Ready(Some(())), |
413 | | Poll::Pending => Poll::Ready(None), |
414 | | } |
415 | | } |
416 | | } |
417 | | |
418 | | #[cfg(not(miri))] |
419 | | #[tokio::test] |
420 | | async fn drop_receiver_sends_cancel_errors() { |
421 | | let _ = pretty_env_logger::try_init(); |
422 | | |
423 | | let (mut tx, mut rx) = channel::<Custom, ()>(); |
424 | | |
425 | | // must poll once for try_send to succeed |
426 | | assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); |
427 | | |
428 | | let promise = tx.try_send(Custom(43)).unwrap(); |
429 | | drop(rx); |
430 | | |
431 | | let fulfilled = promise.await; |
432 | | let err = fulfilled |
433 | | .expect("fulfilled") |
434 | | .expect_err("promise should error"); |
435 | | match (err.error.is_canceled(), err.message) { |
436 | | (true, Some(_)) => (), |
437 | | e => panic!("expected Error::Cancel(_), found {:?}", e), |
438 | | } |
439 | | } |
440 | | |
441 | | #[cfg(not(miri))] |
442 | | #[tokio::test] |
443 | | async fn sender_checks_for_want_on_send() { |
444 | | let (mut tx, mut rx) = channel::<Custom, ()>(); |
445 | | |
446 | | // one is allowed to buffer, second is rejected |
447 | | let _ = tx.try_send(Custom(1)).expect("1 buffered"); |
448 | | tx.try_send(Custom(2)).expect_err("2 not ready"); |
449 | | |
450 | | assert!(PollOnce(&mut rx).await.is_some(), "rx once"); |
451 | | |
452 | | // Even though 1 has been popped, only 1 could be buffered for the |
453 | | // lifetime of the channel. |
454 | | tx.try_send(Custom(2)).expect_err("2 still not ready"); |
455 | | |
456 | | assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); |
457 | | |
458 | | let _ = tx.try_send(Custom(2)).expect("2 ready"); |
459 | | } |
460 | | |
461 | | #[cfg(feature = "http2")] |
462 | | #[test] |
463 | | fn unbounded_sender_doesnt_bound_on_want() { |
464 | | let (tx, rx) = channel::<Custom, ()>(); |
465 | | let mut tx = tx.unbound(); |
466 | | |
467 | | let _ = tx.try_send(Custom(1)).unwrap(); |
468 | | let _ = tx.try_send(Custom(2)).unwrap(); |
469 | | let _ = tx.try_send(Custom(3)).unwrap(); |
470 | | |
471 | | drop(rx); |
472 | | |
473 | | let _ = tx.try_send(Custom(4)).unwrap_err(); |
474 | | } |
475 | | |
476 | | #[cfg(feature = "nightly")] |
477 | | #[bench] |
478 | | fn giver_queue_throughput(b: &mut test::Bencher) { |
479 | | use crate::{body::Incoming, Request, Response}; |
480 | | |
481 | | let rt = tokio::runtime::Builder::new_current_thread() |
482 | | .build() |
483 | | .unwrap(); |
484 | | let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>(); |
485 | | |
486 | | b.iter(move || { |
487 | | let _ = tx.send(Request::new(Incoming::empty())).unwrap(); |
488 | | rt.block_on(async { |
489 | | loop { |
490 | | let poll_once = PollOnce(&mut rx); |
491 | | let opt = poll_once.await; |
492 | | if opt.is_none() { |
493 | | break; |
494 | | } |
495 | | } |
496 | | }); |
497 | | }) |
498 | | } |
499 | | |
500 | | #[cfg(feature = "nightly")] |
501 | | #[bench] |
502 | | fn giver_queue_not_ready(b: &mut test::Bencher) { |
503 | | let rt = tokio::runtime::Builder::new_current_thread() |
504 | | .build() |
505 | | .unwrap(); |
506 | | let (_tx, mut rx) = channel::<i32, ()>(); |
507 | | b.iter(move || { |
508 | | rt.block_on(async { |
509 | | let poll_once = PollOnce(&mut rx); |
510 | | assert!(poll_once.await.is_none()); |
511 | | }); |
512 | | }) |
513 | | } |
514 | | |
515 | | #[cfg(feature = "nightly")] |
516 | | #[bench] |
517 | | fn giver_queue_cancel(b: &mut test::Bencher) { |
518 | | let (_tx, mut rx) = channel::<i32, ()>(); |
519 | | |
520 | | b.iter(move || { |
521 | | rx.taker.cancel(); |
522 | | }) |
523 | | } |
524 | | } |