/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.2/src/retry/future.rs
Line | Count | Source |
1 | | //! Future types |
2 | | |
3 | | use super::{Policy, Retry}; |
4 | | use futures_core::ready; |
5 | | use pin_project_lite::pin_project; |
6 | | use std::future::Future; |
7 | | use std::pin::Pin; |
8 | | use std::task::{Context, Poll}; |
9 | | use tower_service::Service; |
10 | | |
11 | | pin_project! { |
12 | | /// The [`Future`] returned by a [`Retry`] service. |
13 | | #[derive(Debug)] |
14 | | pub struct ResponseFuture<P, S, Request> |
15 | | where |
16 | | P: Policy<Request, S::Response, S::Error>, |
17 | | S: Service<Request>, |
18 | | { |
19 | | request: Option<Request>, |
20 | | #[pin] |
21 | | retry: Retry<P, S>, |
22 | | #[pin] |
23 | | state: State<S::Future, P::Future>, |
24 | | } |
25 | | } |
26 | | |
27 | | pin_project! { |
28 | | #[project = StateProj] |
29 | | #[derive(Debug)] |
30 | | enum State<F, P> { |
31 | | // Polling the future from [`Service::call`] |
32 | | Called { |
33 | | #[pin] |
34 | | future: F |
35 | | }, |
36 | | // Polling the future from [`Policy::retry`] |
37 | | Waiting { |
38 | | #[pin] |
39 | | waiting: P |
40 | | }, |
41 | | // Polling [`Service::poll_ready`] after [`Waiting`] was OK. |
42 | | Retrying, |
43 | | } |
44 | | } |
45 | | |
46 | | impl<P, S, Request> ResponseFuture<P, S, Request> |
47 | | where |
48 | | P: Policy<Request, S::Response, S::Error>, |
49 | | S: Service<Request>, |
50 | | { |
51 | 0 | pub(crate) fn new( |
52 | 0 | request: Option<Request>, |
53 | 0 | retry: Retry<P, S>, |
54 | 0 | future: S::Future, |
55 | 0 | ) -> ResponseFuture<P, S, Request> { |
56 | 0 | ResponseFuture { |
57 | 0 | request, |
58 | 0 | retry, |
59 | 0 | state: State::Called { future }, |
60 | 0 | } |
61 | 0 | } Unexecuted instantiation: <tower::retry::future::ResponseFuture<reqwest::retry::Policy, reqwest::async_impl::client::HyperService, http::request::Request<reqwest::async_impl::body::Body>>>::new Unexecuted instantiation: <tower::retry::future::ResponseFuture<_, _, _>>::new |
62 | | } |
63 | | |
64 | | impl<P, S, Request> Future for ResponseFuture<P, S, Request> |
65 | | where |
66 | | P: Policy<Request, S::Response, S::Error>, |
67 | | S: Service<Request>, |
68 | | { |
69 | | type Output = Result<S::Response, S::Error>; |
70 | | |
71 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
72 | 0 | let mut this = self.project(); |
73 | | |
74 | | loop { |
75 | 0 | match this.state.as_mut().project() { |
76 | 0 | StateProj::Called { future } => { |
77 | 0 | let mut result = ready!(future.poll(cx)); |
78 | 0 | if let Some(req) = &mut this.request { |
79 | 0 | match this.retry.policy.retry(req, &mut result) { |
80 | 0 | Some(waiting) => { |
81 | 0 | this.state.set(State::Waiting { waiting }); |
82 | 0 | } |
83 | 0 | None => return Poll::Ready(result), |
84 | | } |
85 | | } else { |
86 | | // request wasn't cloned, so no way to retry it |
87 | 0 | return Poll::Ready(result); |
88 | | } |
89 | | } |
90 | 0 | StateProj::Waiting { waiting } => { |
91 | 0 | ready!(waiting.poll(cx)); |
92 | | |
93 | 0 | this.state.set(State::Retrying); |
94 | | } |
95 | | StateProj::Retrying => { |
96 | | // NOTE: we assume here that |
97 | | // |
98 | | // this.retry.poll_ready() |
99 | | // |
100 | | // is equivalent to |
101 | | // |
102 | | // this.retry.service.poll_ready() |
103 | | // |
104 | | // we need to make that assumption to avoid adding an Unpin bound to the Policy |
105 | | // in Ready to make it Unpin so that we can get &mut Ready as needed to call |
106 | | // poll_ready on it. |
107 | 0 | ready!(this.retry.as_mut().project().service.poll_ready(cx))?; |
108 | 0 | let req = this |
109 | 0 | .request |
110 | 0 | .take() |
111 | 0 | .expect("retrying requires cloned request"); |
112 | 0 | *this.request = this.retry.policy.clone_request(&req); |
113 | 0 | this.state.set(State::Called { |
114 | 0 | future: this.retry.as_mut().project().service.call(req), |
115 | 0 | }); |
116 | | } |
117 | | } |
118 | | } |
119 | 0 | } Unexecuted instantiation: <tower::retry::future::ResponseFuture<reqwest::retry::Policy, reqwest::async_impl::client::HyperService, http::request::Request<reqwest::async_impl::body::Body>> as core::future::future::Future>::poll Unexecuted instantiation: <tower::retry::future::ResponseFuture<_, _, _> as core::future::future::Future>::poll |
120 | | } |