Coverage Report

Created: 2025-12-31 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-1.8.1/src/client/dispatch.rs
Line
Count
Source
1
use std::task::{Context, Poll};
2
#[cfg(feature = "http2")]
3
use std::{future::Future, pin::Pin};
4
5
#[cfg(feature = "http2")]
6
use http::{Request, Response};
7
#[cfg(feature = "http2")]
8
use http_body::Body;
9
#[cfg(feature = "http2")]
10
use pin_project_lite::pin_project;
11
use tokio::sync::{mpsc, oneshot};
12
13
#[cfg(feature = "http2")]
14
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
15
16
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
17
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
18
19
/// An error when calling `try_send_request`.
20
///
21
/// There is a possibility of an error occurring on a connection in-between the
22
/// time that a request is queued and when it is actually written to the IO
23
/// transport. If that happens, it is safe to return the request back to the
24
/// caller, as it was never fully sent.
25
#[derive(Debug)]
26
pub struct TrySendError<T> {
27
    pub(crate) error: crate::Error,
28
    pub(crate) message: Option<T>,
29
}
30
31
0
pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
32
0
    let (tx, rx) = mpsc::unbounded_channel();
33
0
    let (giver, taker) = want::new();
34
0
    let tx = Sender {
35
0
        #[cfg(feature = "http1")]
36
0
        buffered_once: false,
37
0
        giver,
38
0
        inner: tx,
39
0
    };
40
0
    let rx = Receiver { inner: rx, taker };
41
0
    (tx, rx)
42
0
}
Unexecuted instantiation: hyper::client::dispatch::channel::<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>
Unexecuted instantiation: hyper::client::dispatch::channel::<_, _>
43
44
/// A bounded sender of requests and callbacks for when responses are ready.
45
///
46
/// While the inner sender is unbounded, the Giver is used to determine
47
/// if the Receiver is ready for another request.
48
pub(crate) struct Sender<T, U> {
49
    /// One message is always allowed, even if the Receiver hasn't asked
50
    /// for it yet. This boolean keeps track of whether we've sent one
51
    /// without notice.
52
    #[cfg(feature = "http1")]
53
    buffered_once: bool,
54
    /// The Giver helps watch that the Receiver side has been polled
55
    /// when the queue is empty. This helps us know when a request and
56
    /// response have been fully processed, and a connection is ready
57
    /// for more.
58
    giver: want::Giver,
59
    /// Actually bounded by the Giver, plus `buffered_once`.
60
    inner: mpsc::UnboundedSender<Envelope<T, U>>,
61
}
62
63
/// An unbounded version.
64
///
65
/// Cannot poll the Giver, but can still use it to determine if the Receiver
66
/// has been dropped. However, this version can be cloned.
67
#[cfg(feature = "http2")]
68
pub(crate) struct UnboundedSender<T, U> {
69
    /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
70
    giver: want::SharedGiver,
71
    inner: mpsc::UnboundedSender<Envelope<T, U>>,
72
}
73
74
impl<T, U> Sender<T, U> {
75
    #[cfg(feature = "http1")]
76
0
    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
77
0
        self.giver
78
0
            .poll_want(cx)
79
0
            .map_err(|_| crate::Error::new_closed())
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready::{closure#0}
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready::{closure#0}
80
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_ready
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::poll_ready
81
82
    #[cfg(feature = "http1")]
83
0
    pub(crate) fn is_ready(&self) -> bool {
84
0
        self.giver.is_wanting()
85
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::is_ready
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::is_ready
86
87
    #[cfg(feature = "http1")]
88
0
    pub(crate) fn is_closed(&self) -> bool {
89
0
        self.giver.is_canceled()
90
0
    }
91
92
    #[cfg(feature = "http1")]
93
0
    fn can_send(&mut self) -> bool {
94
0
        if self.giver.give() || !self.buffered_once {
95
            // If the receiver is ready *now*, then of course we can send.
96
            //
97
            // If the receiver isn't ready yet, but we don't have anything
98
            // in the channel yet, then allow one message.
99
0
            self.buffered_once = true;
100
0
            true
101
        } else {
102
0
            false
103
        }
104
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::can_send
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::can_send
105
106
    #[cfg(feature = "http1")]
107
0
    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
108
0
        if !self.can_send() {
109
0
            return Err(val);
110
0
        }
111
0
        let (tx, rx) = oneshot::channel();
112
0
        self.inner
113
0
            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
114
0
            .map(move |_| rx)
115
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send::{closure#1}
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send::{closure#1}
116
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Sender<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_send
Unexecuted instantiation: <hyper::client::dispatch::Sender<_, _>>::try_send
117
118
    #[cfg(feature = "http1")]
119
0
    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
120
0
        if !self.can_send() {
121
0
            return Err(val);
122
0
        }
123
0
        let (tx, rx) = oneshot::channel();
124
0
        self.inner
125
0
            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
126
0
            .map(move |_| rx)
127
0
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
128
0
    }
129
130
    #[cfg(feature = "http2")]
131
    pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
132
        UnboundedSender {
133
            giver: self.giver.shared(),
134
            inner: self.inner,
135
        }
136
    }
137
}
138
139
#[cfg(feature = "http2")]
140
impl<T, U> UnboundedSender<T, U> {
141
    pub(crate) fn is_ready(&self) -> bool {
142
        !self.giver.is_canceled()
143
    }
144
145
    pub(crate) fn is_closed(&self) -> bool {
146
        self.giver.is_canceled()
147
    }
148
149
    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
150
        let (tx, rx) = oneshot::channel();
151
        self.inner
152
            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
153
            .map(move |_| rx)
154
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
155
    }
156
157
    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
158
        let (tx, rx) = oneshot::channel();
159
        self.inner
160
            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
161
            .map(move |_| rx)
162
            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
163
    }
164
}
165
166
#[cfg(feature = "http2")]
167
impl<T, U> Clone for UnboundedSender<T, U> {
168
    fn clone(&self) -> Self {
169
        UnboundedSender {
170
            giver: self.giver.clone(),
171
            inner: self.inner.clone(),
172
        }
173
    }
174
}
175
176
pub(crate) struct Receiver<T, U> {
177
    inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
178
    taker: want::Taker,
179
}
180
181
impl<T, U> Receiver<T, U> {
182
0
    pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
183
0
        match self.inner.poll_recv(cx) {
184
0
            Poll::Ready(item) => {
185
0
                Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv::{closure#0}
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv::{closure#0}
186
            }
187
            Poll::Pending => {
188
0
                self.taker.want();
189
0
                Poll::Pending
190
            }
191
        }
192
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_recv
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::poll_recv
193
194
    #[cfg(feature = "http1")]
195
0
    pub(crate) fn close(&mut self) {
196
0
        self.taker.cancel();
197
0
        self.inner.close();
198
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::close
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::close
199
200
    #[cfg(feature = "http1")]
201
0
    pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
202
0
        match crate::common::task::now_or_never(self.inner.recv()) {
203
0
            Some(Some(mut env)) => env.0.take(),
204
0
            _ => None,
205
        }
206
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::try_recv
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _>>::try_recv
207
}
208
209
impl<T, U> Drop for Receiver<T, U> {
210
0
    fn drop(&mut self) {
211
        // Notify the giver about the closure first, before dropping
212
        // the mpsc::Receiver.
213
0
        self.taker.cancel();
214
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Receiver<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Receiver<_, _> as core::ops::drop::Drop>::drop
215
}
216
217
struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
218
219
impl<T, U> Drop for Envelope<T, U> {
220
0
    fn drop(&mut self) {
221
0
        if let Some((val, cb)) = self.0.take() {
222
0
            cb.send(Err(TrySendError {
223
0
                error: crate::Error::new_canceled().with("connection closed"),
224
0
                message: Some(val),
225
0
            }));
226
0
        }
227
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Envelope<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Envelope<_, _> as core::ops::drop::Drop>::drop
228
}
229
230
pub(crate) enum Callback<T, U> {
231
    #[allow(unused)]
232
    Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
233
    NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
234
}
235
236
impl<T, U> Drop for Callback<T, U> {
237
0
    fn drop(&mut self) {
238
0
        match self {
239
0
            Callback::Retry(tx) => {
240
0
                if let Some(tx) = tx.take() {
241
0
                    let _ = tx.send(Err(TrySendError {
242
0
                        error: dispatch_gone(),
243
0
                        message: None,
244
0
                    }));
245
0
                }
246
            }
247
0
            Callback::NoRetry(tx) => {
248
0
                if let Some(tx) = tx.take() {
249
0
                    let _ = tx.send(Err(dispatch_gone()));
250
0
                }
251
            }
252
        }
253
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _> as core::ops::drop::Drop>::drop
254
}
255
256
#[cold]
257
0
fn dispatch_gone() -> crate::Error {
258
    // FIXME(nox): What errors do we want here?
259
0
    crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
260
0
        "user code panicked"
261
    } else {
262
0
        "runtime dropped the dispatch task"
263
    })
264
0
}
265
266
impl<T, U> Callback<T, U> {
267
    #[cfg(feature = "http2")]
268
    pub(crate) fn is_canceled(&self) -> bool {
269
        match *self {
270
            Callback::Retry(Some(ref tx)) => tx.is_closed(),
271
            Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
272
            _ => unreachable!(),
273
        }
274
    }
275
276
0
    pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
277
0
        match *self {
278
0
            Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
279
0
            Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
280
0
            _ => unreachable!(),
281
        }
282
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::poll_canceled
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::poll_canceled
283
284
0
    pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
285
0
        match self {
286
0
            Callback::Retry(ref mut tx) => {
287
0
                let _ = tx.take().unwrap().send(val);
288
0
            }
289
0
            Callback::NoRetry(ref mut tx) => {
290
0
                let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
291
            }
292
        }
293
0
    }
Unexecuted instantiation: <hyper::client::dispatch::Callback<http::request::Request<reqwest::async_impl::body::Body>, http::response::Response<hyper::body::incoming::Incoming>>>::send
Unexecuted instantiation: <hyper::client::dispatch::Callback<_, _>>::send
294
}
295
296
impl<T> TrySendError<T> {
297
    /// Take the message from this error.
298
    ///
299
    /// The message will not always have been recovered. If an error occurs
300
    /// after the message has been serialized onto the connection, it will not
301
    /// be available here.
302
0
    pub fn take_message(&mut self) -> Option<T> {
303
0
        self.message.take()
304
0
    }
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<reqwest::async_impl::body::Body>>>::take_message
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::take_message
305
306
    /// Returns a reference to the recovered message.
307
    ///
308
    /// The message will not always have been recovered. If an error occurs
309
    /// after the message has been serialized onto the connection, it will not
310
    /// be available here.
311
0
    pub fn message(&self) -> Option<&T> {
312
0
        self.message.as_ref()
313
0
    }
314
315
    /// Consumes this to return the inner error.
316
0
    pub fn into_error(self) -> crate::Error {
317
0
        self.error
318
0
    }
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<http::request::Request<reqwest::async_impl::body::Body>>>::into_error
Unexecuted instantiation: <hyper::client::dispatch::TrySendError<_>>::into_error
319
320
    /// Returns a reference to the inner error.
321
0
    pub fn error(&self) -> &crate::Error {
322
0
        &self.error
323
0
    }
324
}
325
326
#[cfg(feature = "http2")]
327
pin_project! {
328
    pub struct SendWhen<B, E>
329
    where
330
        B: Body,
331
        B: 'static,
332
    {
333
        #[pin]
334
        pub(crate) when: ResponseFutMap<B, E>,
335
        #[pin]
336
        pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
337
    }
338
}
339
340
#[cfg(feature = "http2")]
341
impl<B, E> Future for SendWhen<B, E>
342
where
343
    B: Body + 'static,
344
    E: crate::rt::bounds::Http2UpgradedExec<B::Data>,
345
{
346
    type Output = ();
347
348
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
349
        let mut this = self.project();
350
351
        let mut call_back = this.call_back.take().expect("polled after complete");
352
353
        match Pin::new(&mut this.when).poll(cx) {
354
            Poll::Ready(Ok(res)) => {
355
                call_back.send(Ok(res));
356
                Poll::Ready(())
357
            }
358
            Poll::Pending => {
359
                // check if the callback is canceled
360
                match call_back.poll_canceled(cx) {
361
                    Poll::Ready(v) => v,
362
                    Poll::Pending => {
363
                        // Move call_back back to struct before return
364
                        this.call_back.set(Some(call_back));
365
                        return Poll::Pending;
366
                    }
367
                };
368
                trace!("send_when canceled");
369
                Poll::Ready(())
370
            }
371
            Poll::Ready(Err((error, message))) => {
372
                call_back.send(Err(TrySendError { error, message }));
373
                Poll::Ready(())
374
            }
375
        }
376
    }
377
}
378
379
#[cfg(test)]
380
mod tests {
381
    #[cfg(feature = "nightly")]
382
    extern crate test;
383
384
    use std::future::Future;
385
    use std::pin::Pin;
386
    use std::task::{Context, Poll};
387
388
    use super::{channel, Callback, Receiver};
389
390
    #[derive(Debug)]
391
    struct Custom(#[allow(dead_code)] i32);
392
393
    impl<T, U> Future for Receiver<T, U> {
394
        type Output = Option<(T, Callback<T, U>)>;
395
396
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
397
            self.poll_recv(cx)
398
        }
399
    }
400
401
    /// Helper to check if the future is ready after polling once.
402
    struct PollOnce<'a, F>(&'a mut F);
403
404
    impl<F, T> Future for PollOnce<'_, F>
405
    where
406
        F: Future<Output = T> + Unpin,
407
    {
408
        type Output = Option<()>;
409
410
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
411
            match Pin::new(&mut self.0).poll(cx) {
412
                Poll::Ready(_) => Poll::Ready(Some(())),
413
                Poll::Pending => Poll::Ready(None),
414
            }
415
        }
416
    }
417
418
    #[cfg(not(miri))]
419
    #[tokio::test]
420
    async fn drop_receiver_sends_cancel_errors() {
421
        let _ = pretty_env_logger::try_init();
422
423
        let (mut tx, mut rx) = channel::<Custom, ()>();
424
425
        // must poll once for try_send to succeed
426
        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
427
428
        let promise = tx.try_send(Custom(43)).unwrap();
429
        drop(rx);
430
431
        let fulfilled = promise.await;
432
        let err = fulfilled
433
            .expect("fulfilled")
434
            .expect_err("promise should error");
435
        match (err.error.is_canceled(), err.message) {
436
            (true, Some(_)) => (),
437
            e => panic!("expected Error::Cancel(_), found {:?}", e),
438
        }
439
    }
440
441
    #[cfg(not(miri))]
442
    #[tokio::test]
443
    async fn sender_checks_for_want_on_send() {
444
        let (mut tx, mut rx) = channel::<Custom, ()>();
445
446
        // one is allowed to buffer, second is rejected
447
        let _ = tx.try_send(Custom(1)).expect("1 buffered");
448
        tx.try_send(Custom(2)).expect_err("2 not ready");
449
450
        assert!(PollOnce(&mut rx).await.is_some(), "rx once");
451
452
        // Even though 1 has been popped, only 1 could be buffered for the
453
        // lifetime of the channel.
454
        tx.try_send(Custom(2)).expect_err("2 still not ready");
455
456
        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
457
458
        let _ = tx.try_send(Custom(2)).expect("2 ready");
459
    }
460
461
    #[cfg(feature = "http2")]
462
    #[test]
463
    fn unbounded_sender_doesnt_bound_on_want() {
464
        let (tx, rx) = channel::<Custom, ()>();
465
        let mut tx = tx.unbound();
466
467
        let _ = tx.try_send(Custom(1)).unwrap();
468
        let _ = tx.try_send(Custom(2)).unwrap();
469
        let _ = tx.try_send(Custom(3)).unwrap();
470
471
        drop(rx);
472
473
        let _ = tx.try_send(Custom(4)).unwrap_err();
474
    }
475
476
    #[cfg(feature = "nightly")]
477
    #[bench]
478
    fn giver_queue_throughput(b: &mut test::Bencher) {
479
        use crate::{body::Incoming, Request, Response};
480
481
        let rt = tokio::runtime::Builder::new_current_thread()
482
            .build()
483
            .unwrap();
484
        let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>();
485
486
        b.iter(move || {
487
            let _ = tx.send(Request::new(Incoming::empty())).unwrap();
488
            rt.block_on(async {
489
                loop {
490
                    let poll_once = PollOnce(&mut rx);
491
                    let opt = poll_once.await;
492
                    if opt.is_none() {
493
                        break;
494
                    }
495
                }
496
            });
497
        })
498
    }
499
500
    #[cfg(feature = "nightly")]
501
    #[bench]
502
    fn giver_queue_not_ready(b: &mut test::Bencher) {
503
        let rt = tokio::runtime::Builder::new_current_thread()
504
            .build()
505
            .unwrap();
506
        let (_tx, mut rx) = channel::<i32, ()>();
507
        b.iter(move || {
508
            rt.block_on(async {
509
                let poll_once = PollOnce(&mut rx);
510
                assert!(poll_once.await.is_none());
511
            });
512
        })
513
    }
514
515
    #[cfg(feature = "nightly")]
516
    #[bench]
517
    fn giver_queue_cancel(b: &mut test::Bencher) {
518
        let (_tx, mut rx) = channel::<i32, ()>();
519
520
        b.iter(move || {
521
            rx.taker.cancel();
522
        })
523
    }
524
}