/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.2/src/hedge/mod.rs
Line | Count | Source |
1 | | //! Pre-emptively retry requests which have been outstanding for longer |
2 | | //! than a given latency percentile. |
3 | | |
4 | | #![warn(missing_debug_implementations, missing_docs, unreachable_pub)] |
5 | | |
6 | | use crate::filter::AsyncFilter; |
7 | | use futures_util::future; |
8 | | use pin_project_lite::pin_project; |
9 | | use std::sync::{Arc, Mutex}; |
10 | | use std::time::Duration; |
11 | | use std::{ |
12 | | pin::Pin, |
13 | | task::{Context, Poll}, |
14 | | }; |
15 | | use tracing::error; |
16 | | |
17 | | mod delay; |
18 | | mod latency; |
19 | | mod rotating_histogram; |
20 | | mod select; |
21 | | |
22 | | use delay::Delay; |
23 | | use latency::Latency; |
24 | | use rotating_histogram::RotatingHistogram; |
25 | | use select::Select; |
26 | | |
27 | | type Histo = Arc<Mutex<RotatingHistogram>>; |
28 | | type Service<S, P> = select::Select< |
29 | | SelectPolicy<P>, |
30 | | Latency<Histo, S>, |
31 | | Delay<DelayPolicy, AsyncFilter<Latency<Histo, S>, PolicyPredicate<P>>>, |
32 | | >; |
33 | | |
34 | | /// A middleware that pre-emptively retries requests which have been outstanding |
35 | | /// for longer than a given latency percentile. If either of the original |
36 | | /// future or the retry future completes, that value is used. |
37 | | #[derive(Debug)] |
38 | | pub struct Hedge<S, P>(Service<S, P>); |
39 | | |
40 | | pin_project! { |
41 | | /// The [`Future`] returned by the [`Hedge`] service. |
42 | | /// |
43 | | /// [`Future`]: std::future::Future |
44 | | #[derive(Debug)] |
45 | | pub struct Future<S, Request> |
46 | | where |
47 | | S: tower_service::Service<Request>, |
48 | | { |
49 | | #[pin] |
50 | | inner: S::Future, |
51 | | } |
52 | | } |
53 | | |
54 | | /// A policy which describes which requests can be cloned and then whether those |
55 | | /// requests should be retried. |
56 | | pub trait Policy<Request> { |
57 | | /// Called when the request is first received to determine if the request is retryable. |
58 | | fn clone_request(&self, req: &Request) -> Option<Request>; |
59 | | |
60 | | /// Called after the hedge timeout to determine if the hedge retry should be issued. |
61 | | fn can_retry(&self, req: &Request) -> bool; |
62 | | } |
63 | | |
64 | | // NOTE: these are pub only because they appear inside a Future<F> |
65 | | |
66 | | #[doc(hidden)] |
67 | | #[derive(Clone, Debug)] |
68 | | pub struct PolicyPredicate<P>(P); |
69 | | |
70 | | #[doc(hidden)] |
71 | | #[derive(Debug)] |
72 | | pub struct DelayPolicy { |
73 | | histo: Histo, |
74 | | latency_percentile: f32, |
75 | | } |
76 | | |
77 | | #[doc(hidden)] |
78 | | #[derive(Debug)] |
79 | | pub struct SelectPolicy<P> { |
80 | | policy: P, |
81 | | histo: Histo, |
82 | | min_data_points: u64, |
83 | | } |
84 | | |
85 | | impl<S, P> Hedge<S, P> { |
86 | | /// Create a new hedge middleware. |
87 | 0 | pub fn new<Request>( |
88 | 0 | service: S, |
89 | 0 | policy: P, |
90 | 0 | min_data_points: u64, |
91 | 0 | latency_percentile: f32, |
92 | 0 | period: Duration, |
93 | 0 | ) -> Hedge<S, P> |
94 | 0 | where |
95 | 0 | S: tower_service::Service<Request> + Clone, |
96 | 0 | S::Error: Into<crate::BoxError>, |
97 | 0 | P: Policy<Request> + Clone, |
98 | | { |
99 | 0 | let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); |
100 | 0 | Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) |
101 | 0 | } |
102 | | |
103 | | /// A hedge middleware with a prepopulated latency histogram. This is usedful |
104 | | /// for integration tests. |
105 | 0 | pub fn new_with_mock_latencies<Request>( |
106 | 0 | service: S, |
107 | 0 | policy: P, |
108 | 0 | min_data_points: u64, |
109 | 0 | latency_percentile: f32, |
110 | 0 | period: Duration, |
111 | 0 | latencies_ms: &[u64], |
112 | 0 | ) -> Hedge<S, P> |
113 | 0 | where |
114 | 0 | S: tower_service::Service<Request> + Clone, |
115 | 0 | S::Error: Into<crate::BoxError>, |
116 | 0 | P: Policy<Request> + Clone, |
117 | | { |
118 | 0 | let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); |
119 | | { |
120 | 0 | let mut locked = histo.lock().unwrap(); |
121 | 0 | for latency in latencies_ms.iter() { |
122 | 0 | locked.read().record(*latency).unwrap(); |
123 | 0 | } |
124 | | } |
125 | 0 | Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) |
126 | 0 | } |
127 | | |
128 | 0 | fn new_with_histo<Request>( |
129 | 0 | service: S, |
130 | 0 | policy: P, |
131 | 0 | min_data_points: u64, |
132 | 0 | latency_percentile: f32, |
133 | 0 | histo: Histo, |
134 | 0 | ) -> Hedge<S, P> |
135 | 0 | where |
136 | 0 | S: tower_service::Service<Request> + Clone, |
137 | 0 | S::Error: Into<crate::BoxError>, |
138 | 0 | P: Policy<Request> + Clone, |
139 | | { |
140 | | // Clone the underlying service and wrap both copies in a middleware that |
141 | | // records the latencies in a rotating histogram. |
142 | 0 | let recorded_a = Latency::new(histo.clone(), service.clone()); |
143 | 0 | let recorded_b = Latency::new(histo.clone(), service); |
144 | | |
145 | | // Check policy to see if the hedge request should be issued. |
146 | 0 | let filtered = AsyncFilter::new(recorded_b, PolicyPredicate(policy.clone())); |
147 | | |
148 | | // Delay the second request by a percentile of the recorded request latency |
149 | | // histogram. |
150 | 0 | let delay_policy = DelayPolicy { |
151 | 0 | histo: histo.clone(), |
152 | 0 | latency_percentile, |
153 | 0 | }; |
154 | 0 | let delayed = Delay::new(delay_policy, filtered); |
155 | | |
156 | | // If the request is retryable, issue two requests -- the second one delayed |
157 | | // by a latency percentile. Use the first result to complete. |
158 | 0 | let select_policy = SelectPolicy { |
159 | 0 | policy, |
160 | 0 | histo, |
161 | 0 | min_data_points, |
162 | 0 | }; |
163 | 0 | Hedge(Select::new(select_policy, recorded_a, delayed)) |
164 | 0 | } |
165 | | } |
166 | | |
167 | | impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P> |
168 | | where |
169 | | S: tower_service::Service<Request> + Clone, |
170 | | S::Error: Into<crate::BoxError>, |
171 | | P: Policy<Request> + Clone, |
172 | | { |
173 | | type Response = S::Response; |
174 | | type Error = crate::BoxError; |
175 | | type Future = Future<Service<S, P>, Request>; |
176 | | |
177 | 0 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
178 | 0 | self.0.poll_ready(cx) |
179 | 0 | } |
180 | | |
181 | 0 | fn call(&mut self, request: Request) -> Self::Future { |
182 | 0 | Future { |
183 | 0 | inner: self.0.call(request), |
184 | 0 | } |
185 | 0 | } |
186 | | } |
187 | | |
188 | | impl<S, Request> std::future::Future for Future<S, Request> |
189 | | where |
190 | | S: tower_service::Service<Request>, |
191 | | S::Error: Into<crate::BoxError>, |
192 | | { |
193 | | type Output = Result<S::Response, crate::BoxError>; |
194 | | |
195 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
196 | 0 | self.project().inner.poll(cx).map_err(Into::into) |
197 | 0 | } |
198 | | } |
199 | | |
200 | | // TODO: Remove when Duration::as_millis() becomes stable. |
201 | | const NANOS_PER_MILLI: u32 = 1_000_000; |
202 | | const MILLIS_PER_SEC: u64 = 1_000; |
203 | 0 | fn millis(duration: Duration) -> u64 { |
204 | | // Round up. |
205 | 0 | let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI; |
206 | 0 | duration |
207 | 0 | .as_secs() |
208 | 0 | .saturating_mul(MILLIS_PER_SEC) |
209 | 0 | .saturating_add(u64::from(millis)) |
210 | 0 | } |
211 | | |
212 | | impl latency::Record for Histo { |
213 | 0 | fn record(&mut self, latency: Duration) { |
214 | 0 | let mut locked = self.lock().unwrap(); |
215 | 0 | locked.write().record(millis(latency)).unwrap_or_else(|e| { |
216 | 0 | error!("Failed to write to hedge histogram: {:?}", e); |
217 | 0 | }) |
218 | 0 | } |
219 | | } |
220 | | |
221 | | impl<P, Request> crate::filter::AsyncPredicate<Request> for PolicyPredicate<P> |
222 | | where |
223 | | P: Policy<Request>, |
224 | | { |
225 | | type Future = future::Either< |
226 | | future::Ready<Result<Request, crate::BoxError>>, |
227 | | future::Pending<Result<Request, crate::BoxError>>, |
228 | | >; |
229 | | type Request = Request; |
230 | | |
231 | 0 | fn check(&mut self, request: Request) -> Self::Future { |
232 | 0 | if self.0.can_retry(&request) { |
233 | 0 | future::Either::Left(future::ready(Ok(request))) |
234 | | } else { |
235 | | // If the hedge retry should not be issued, we simply want to wait |
236 | | // for the result of the original request. Therefore we don't want |
237 | | // to return an error here. Instead, we use future::pending to ensure |
238 | | // that the original request wins the select. |
239 | 0 | future::Either::Right(future::pending()) |
240 | | } |
241 | 0 | } |
242 | | } |
243 | | |
244 | | impl<Request> delay::Policy<Request> for DelayPolicy { |
245 | 0 | fn delay(&self, _req: &Request) -> Duration { |
246 | 0 | let mut locked = self.histo.lock().unwrap(); |
247 | 0 | let millis = locked |
248 | 0 | .read() |
249 | 0 | .value_at_quantile(self.latency_percentile.into()); |
250 | 0 | Duration::from_millis(millis) |
251 | 0 | } |
252 | | } |
253 | | |
254 | | impl<P, Request> select::Policy<Request> for SelectPolicy<P> |
255 | | where |
256 | | P: Policy<Request>, |
257 | | { |
258 | 0 | fn clone_request(&self, req: &Request) -> Option<Request> { |
259 | 0 | self.policy.clone_request(req).filter(|_| { |
260 | 0 | let mut locked = self.histo.lock().unwrap(); |
261 | | // Do not attempt a retry if there are insufficiently many data |
262 | | // points in the histogram. |
263 | 0 | locked.read().len() >= self.min_data_points |
264 | 0 | }) |
265 | 0 | } |
266 | | } |