Coverage Report

Created: 2025-11-16 06:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.6.0/src/client/dispatch.rs
Line
Count
Source
1
use std::task::{Context, Poll};
2
#[cfg(feature = "http2")]
3
use std::{future::Future, pin::Pin};
4
5
#[cfg(feature = "http2")]
6
use http::{Request, Response};
7
#[cfg(feature = "http2")]
8
use http_body::Body;
9
#[cfg(feature = "http2")]
10
use pin_project_lite::pin_project;
11
use tokio::sync::{mpsc, oneshot};
12
13
#[cfg(feature = "http2")]
14
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
15
16
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
17
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
18
19
/// An error when calling `try_send_request`.
20
///
21
/// There is a possibility of an error occurring on a connection in-between the
22
/// time that a request is queued and when it is actually written to the IO
23
/// transport. If that happens, it is safe to return the request back to the
24
/// caller, as it was never fully sent.
25
#[derive(Debug)]
26
pub struct TrySendError<T> {
27
    pub(crate) error: crate::Error,
28
    pub(crate) message: Option<T>,
29
}
30
31
0
pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
32
0
    let (tx, rx) = mpsc::unbounded_channel();
33
0
    let (giver, taker) = want::new();
34
0
    let tx = Sender {
35
0
        #[cfg(feature = "http1")]
36
0
        buffered_once: false,
37
0
        giver,
38
0
        inner: tx,
39
0
    };
40
0
    let rx = Receiver { inner: rx, taker };
41
0
    (tx, rx)
42
0
}
Unexecuted instantiation: hyper::client::dispatch::channel::<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>
Unexecuted instantiation: hyper::client::dispatch::channel::<_, _>
43
44
/// A bounded sender of requests and callbacks for when responses are ready.
45
///
46
/// While the inner sender is unbounded, the Giver is used to determine
47
/// if the Receiver is ready for another request.
48
pub(crate) struct Sender<T, U> {
49
    /// One message is always allowed, even if the Receiver hasn't asked
50
    /// for it yet. This boolean keeps track of whether we've sent one
51
    /// without notice.
52
    #[cfg(feature = "http1")]
53
    buffered_once: bool,
54
    /// The Giver helps watch that the Receiver side has been polled
55
    /// when the queue is empty. This helps us know when a request and
56
    /// response have been fully processed, and a connection is ready
57
    /// for more.
58
    giver: want::Giver,
59
    /// Actually bounded by the Giver, plus `buffered_once`.
60
    inner: mpsc::UnboundedSender<Envelope<T, U>>,
61
}
62
63
/// An unbounded version.
64
///
65
/// Cannot poll the Giver, but can still use it to determine if the Receiver
66
/// has been dropped. However, this version can be cloned.
67
#[cfg(feature = "http2")]
68
pub(crate) struct UnboundedSender<T, U> {
69
    /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
70
    giver: want::SharedGiver,
71
    inner: mpsc::UnboundedSender<Envelope<T, U>>,
72
}
73
74
impl<T, U> Sender<T, U> {
75
    #[cfg(feature = "http1")]
76
0
    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
77
0
        self.giver
78
0
            .poll_want(cx)
79
0
            .map_err(|_| crate::Error::new_closed())
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready::{closure#0}
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready::{closure#0}
80
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready
81
82
    #[cfg(feature = "http1")]
83
0
    pub(crate) fn is_ready(&self) -> bool {
84
0
        self.giver.is_wanting()
85
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_ready
86
87
    #[cfg(feature = "http1")]
88
0
    pub(crate) fn is_closed(&self) -> bool {
89
0
        self.giver.is_canceled()
90
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_closed
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_closed
91
92
    #[cfg(feature = "http1")]
93
0
    fn can_send(&mut self) -> bool {
94
0
        if self.giver.give() || !self.buffered_once {
95
            // If the receiver is ready *now*, then of course we can send.
96
            //
97
            // If the receiver isn't ready yet, but we don't have anything
98
            // in the channel yet, then allow one message.
99
0
            self.buffered_once = true;
100
0
            true
101
        } else {
102
0
            false
103
        }
104
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::can_send
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::can_send
105
106
    #[cfg(feature = "http1")]
107
0
    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
108
0
        if !self.can_send() {
109
0
            return Err(val);
110
0
        }
111
0
        let (tx, rx) = oneshot::channel();
112
0
        self.inner
113
0
            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
114
0
            .map(move |_| rx)
115
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send::{closure#1}
116
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send
117
118
    #[cfg(feature = "http1")]
119
0
    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
120
0
        if !self.can_send() {
121
0
            return Err(val);
122
0
        }
123
0
        let (tx, rx) = oneshot::channel();
124
0
        self.inner
125
0
            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
126
0
            .map(move |_| rx)
127
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
128
0
    }
129
130
    #[cfg(feature = "http2")]
131
0
    pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
132
0
        UnboundedSender {
133
0
            giver: self.giver.shared(),
134
0
            inner: self.inner,
135
0
        }
136
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::unbound
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::unbound
137
}
138
139
#[cfg(feature = "http2")]
140
impl<T, U> UnboundedSender<T, U> {
141
0
    pub(crate) fn is_ready(&self) -> bool {
142
0
        !self.giver.is_canceled()
143
0
    }
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::is_ready
144
145
0
    pub(crate) fn is_closed(&self) -> bool {
146
0
        self.giver.is_canceled()
147
0
    }
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_closed
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::is_closed
148
149
0
    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
150
0
        let (tx, rx) = oneshot::channel();
151
0
        self.inner
152
0
            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
153
0
            .map(move |_| rx)
154
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::try_send::{closure#1}
155
0
    }
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _>>::try_send
156
157
0
    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
158
0
        let (tx, rx) = oneshot::channel();
159
0
        self.inner
160
0
            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
161
0
            .map(move |_| rx)
162
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
163
0
    }
164
}
165
166
#[cfg(feature = "http2")]
167
impl<T, U> Clone for UnboundedSender<T, U> {
168
0
    fn clone(&self) -> Self {
169
0
        UnboundedSender {
170
0
            giver: self.giver.clone(),
171
0
            inner: self.inner.clone(),
172
0
        }
173
0
    }
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::clone::Clone>::clone
Unexecuted instantiation: <hyper::client::dispatch::UnboundedSender<_, _> as core::clone::Clone>::clone
174
}
175
176
pub(crate) struct Receiver<T, U> {
177
    inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
178
    taker: want::Taker,
179
}
180
181
impl<T, U> Receiver<T, U> {
182
0
    pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
183
0
        match self.inner.poll_recv(cx) {
184
0
            Poll::Ready(item) => {
185
0
                Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv::{closure#0}
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv::{closure#0}
186
            }
187
            Poll::Pending => {
188
0
                self.taker.want();
189
0
                Poll::Pending
190
            }
191
        }
192
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv
193
194
    #[cfg(feature = "http1")]
195
0
    pub(crate) fn close(&mut self) {
196
0
        self.taker.cancel();
197
0
        self.inner.close();
198
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::close
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::close
199
200
    #[cfg(feature = "http1")]
201
0
    pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
202
        use futures_util::FutureExt;
203
0
        match self.inner.recv().now_or_never() {
204
0
            Some(Some(mut env)) => env.0.take(),
205
0
            _ => None,
206
        }
207
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_recv
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::try_recv
208
}
209
210
impl<T, U> Drop for Receiver<T, U> {
211
0
    fn drop(&mut self) {
212
        // Notify the giver about the closure first, before dropping
213
        // the mpsc::Receiver.
214
0
        self.taker.cancel();
215
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _> as core::ops::drop::Drop>::drop
216
}
217
218
struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
219
220
impl<T, U> Drop for Envelope<T, U> {
221
0
    fn drop(&mut self) {
222
0
        if let Some((val, cb)) = self.0.take() {
223
0
            cb.send(Err(TrySendError {
224
0
                error: crate::Error::new_canceled().with("connection closed"),
225
0
                message: Some(val),
226
0
            }));
227
0
        }
228
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Envelope<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Envelope<_, _> as core::ops::drop::Drop>::drop
229
}
230
231
pub(crate) enum Callback<T, U> {
232
    #[allow(unused)]
233
    Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
234
    NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
235
}
236
237
impl<T, U> Drop for Callback<T, U> {
238
0
    fn drop(&mut self) {
239
0
        match self {
240
0
            Callback::Retry(tx) => {
241
0
                if let Some(tx) = tx.take() {
242
0
                    let _ = tx.send(Err(TrySendError {
243
0
                        error: dispatch_gone(),
244
0
                        message: None,
245
0
                    }));
246
0
                }
247
            }
248
0
            Callback::NoRetry(tx) => {
249
0
                if let Some(tx) = tx.take() {
250
0
                    let _ = tx.send(Err(dispatch_gone()));
251
0
                }
252
            }
253
        }
254
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _> as core::ops::drop::Drop>::drop
255
}
256
257
#[cold]
258
0
fn dispatch_gone() -> crate::Error {
259
    // FIXME(nox): What errors do we want here?
260
0
    crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
261
0
        "user code panicked"
262
    } else {
263
0
        "runtime dropped the dispatch task"
264
    })
265
0
}
266
267
impl<T, U> Callback<T, U> {
268
    #[cfg(feature = "http2")]
269
0
    pub(crate) fn is_canceled(&self) -> bool {
270
0
        match *self {
271
0
            Callback::Retry(Some(ref tx)) => tx.is_closed(),
272
0
            Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
273
0
            _ => unreachable!(),
274
        }
275
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_canceled
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::is_canceled
276
277
0
    pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
278
0
        match *self {
279
0
            Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
280
0
            Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
281
0
            _ => unreachable!(),
282
        }
283
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_canceled
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::poll_canceled
284
285
0
    pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
286
0
        match self {
287
0
            Callback::Retry(ref mut tx) => {
288
0
                let _ = tx.take().unwrap().send(val);
289
0
            }
290
0
            Callback::NoRetry(ref mut tx) => {
291
0
                let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
292
            }
293
        }
294
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<tonic::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::send
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::send
295
}
296
297
impl<T> TrySendError<T> {
298
    /// Take the message from this error.
299
    ///
300
    /// The message will not always have been recovered. If an error occurs
301
    /// after the message has been serialized onto the connection, it will not
302
    /// be available here.
303
0
    pub fn take_message(&mut self) -> Option<T> {
304
0
        self.message.take()
305
0
    }
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<tonic::body::Body>>>::take_message
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::take_message
306
307
    /// Consumes this to return the inner error.
308
0
    pub fn into_error(self) -> crate::Error {
309
0
        self.error
310
0
    }
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<tonic::body::Body>>>::into_error
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::into_error
311
}
312
313
#[cfg(feature = "http2")]
314
pin_project! {
315
    pub struct SendWhen<B>
316
    where
317
        B: Body,
318
        B: 'static,
319
    {
320
        #[pin]
321
        pub(crate) when: ResponseFutMap<B>,
322
        #[pin]
323
        pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
324
    }
325
}
326
327
#[cfg(feature = "http2")]
328
impl<B> Future for SendWhen<B>
329
where
330
    B: Body + 'static,
331
{
332
    type Output = ();
333
334
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
335
0
        let mut this = self.project();
336
337
0
        let mut call_back = this.call_back.take().expect("polled after complete");
338
339
0
        match Pin::new(&mut this.when).poll(cx) {
340
0
            Poll::Ready(Ok(res)) => {
341
0
                call_back.send(Ok(res));
342
0
                Poll::Ready(())
343
            }
344
            Poll::Pending => {
345
                // check if the callback is canceled
346
0
                match call_back.poll_canceled(cx) {
347
0
                    Poll::Ready(v) => v,
348
                    Poll::Pending => {
349
                        // Move call_back back to struct before return
350
0
                        this.call_back.set(Some(call_back));
351
0
                        return Poll::Pending;
352
                    }
353
                };
354
                trace!("send_when canceled");
355
0
                Poll::Ready(())
356
            }
357
0
            Poll::Ready(Err((error, message))) => {
358
0
                call_back.send(Err(TrySendError { error, message }));
359
0
                Poll::Ready(())
360
            }
361
        }
362
0
    }
Unexecuted instantiation: <hyper::client::dispatch::SendWhen<tonic::body::Body> as core::future::future::Future>::poll
Unexecuted instantiation: <hyper::client::dispatch::SendWhen<_> as core::future::future::Future>::poll
363
}
364
365
#[cfg(test)]
366
mod tests {
367
    #[cfg(feature = "nightly")]
368
    extern crate test;
369
370
    use std::future::Future;
371
    use std::pin::Pin;
372
    use std::task::{Context, Poll};
373
374
    use super::{channel, Callback, Receiver};
375
376
    #[derive(Debug)]
377
    struct Custom(#[allow(dead_code)] i32);
378
379
    impl<T, U> Future for Receiver<T, U> {
380
        type Output = Option<(T, Callback<T, U>)>;
381
382
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383
            self.poll_recv(cx)
384
        }
385
    }
386
387
    /// Helper to check if the future is ready after polling once.
388
    struct PollOnce<'a, F>(&'a mut F);
389
390
    impl<F, T> Future for PollOnce<'_, F>
391
    where
392
        F: Future<Output = T> + Unpin,
393
    {
394
        type Output = Option<()>;
395
396
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
397
            match Pin::new(&mut self.0).poll(cx) {
398
                Poll::Ready(_) => Poll::Ready(Some(())),
399
                Poll::Pending => Poll::Ready(None),
400
            }
401
        }
402
    }
403
404
    #[cfg(not(miri))]
405
    #[tokio::test]
406
    async fn drop_receiver_sends_cancel_errors() {
407
        let _ = pretty_env_logger::try_init();
408
409
        let (mut tx, mut rx) = channel::<Custom, ()>();
410
411
        // must poll once for try_send to succeed
412
        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
413
414
        let promise = tx.try_send(Custom(43)).unwrap();
415
        drop(rx);
416
417
        let fulfilled = promise.await;
418
        let err = fulfilled
419
            .expect("fulfilled")
420
            .expect_err("promise should error");
421
        match (err.error.is_canceled(), err.message) {
422
            (true, Some(_)) => (),
423
            e => panic!("expected Error::Cancel(_), found {:?}", e),
424
        }
425
    }
426
427
    #[cfg(not(miri))]
428
    #[tokio::test]
429
    async fn sender_checks_for_want_on_send() {
430
        let (mut tx, mut rx) = channel::<Custom, ()>();
431
432
        // one is allowed to buffer, second is rejected
433
        let _ = tx.try_send(Custom(1)).expect("1 buffered");
434
        tx.try_send(Custom(2)).expect_err("2 not ready");
435
436
        assert!(PollOnce(&mut rx).await.is_some(), "rx once");
437
438
        // Even though 1 has been popped, only 1 could be buffered for the
439
        // lifetime of the channel.
440
        tx.try_send(Custom(2)).expect_err("2 still not ready");
441
442
        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
443
444
        let _ = tx.try_send(Custom(2)).expect("2 ready");
445
    }
446
447
    #[cfg(feature = "http2")]
448
    #[test]
449
    fn unbounded_sender_doesnt_bound_on_want() {
450
        let (tx, rx) = channel::<Custom, ()>();
451
        let mut tx = tx.unbound();
452
453
        let _ = tx.try_send(Custom(1)).unwrap();
454
        let _ = tx.try_send(Custom(2)).unwrap();
455
        let _ = tx.try_send(Custom(3)).unwrap();
456
457
        drop(rx);
458
459
        let _ = tx.try_send(Custom(4)).unwrap_err();
460
    }
461
462
    #[cfg(feature = "nightly")]
463
    #[bench]
464
    fn giver_queue_throughput(b: &mut test::Bencher) {
465
        use crate::{body::Incoming, Request, Response};
466
467
        let rt = tokio::runtime::Builder::new_current_thread()
468
            .build()
469
            .unwrap();
470
        let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>();
471
472
        b.iter(move || {
473
            let _ = tx.send(Request::new(Incoming::empty())).unwrap();
474
            rt.block_on(async {
475
                loop {
476
                    let poll_once = PollOnce(&mut rx);
477
                    let opt = poll_once.await;
478
                    if opt.is_none() {
479
                        break;
480
                    }
481
                }
482
            });
483
        })
484
    }
485
486
    #[cfg(feature = "nightly")]
487
    #[bench]
488
    fn giver_queue_not_ready(b: &mut test::Bencher) {
489
        let rt = tokio::runtime::Builder::new_current_thread()
490
            .build()
491
            .unwrap();
492
        let (_tx, mut rx) = channel::<i32, ()>();
493
        b.iter(move || {
494
            rt.block_on(async {
495
                let poll_once = PollOnce(&mut rx);
496
                assert!(poll_once.await.is_none());
497
            });
498
        })
499
    }
500
501
    #[cfg(feature = "nightly")]
502
    #[bench]
503
    fn giver_queue_cancel(b: &mut test::Bencher) {
504
        let (_tx, mut rx) = channel::<i32, ()>();
505
506
        b.iter(move || {
507
            rx.taker.cancel();
508
        })
509
    }
510
}