Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.2/src/hedge/mod.rs
Line
Count
Source
1
//! Pre-emptively retry requests which have been outstanding for longer
2
//! than a given latency percentile.
3
4
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
5
6
use crate::filter::AsyncFilter;
7
use futures_util::future;
8
use pin_project_lite::pin_project;
9
use std::sync::{Arc, Mutex};
10
use std::time::Duration;
11
use std::{
12
    pin::Pin,
13
    task::{Context, Poll},
14
};
15
use tracing::error;
16
17
mod delay;
18
mod latency;
19
mod rotating_histogram;
20
mod select;
21
22
use delay::Delay;
23
use latency::Latency;
24
use rotating_histogram::RotatingHistogram;
25
use select::Select;
26
27
type Histo = Arc<Mutex<RotatingHistogram>>;
28
type Service<S, P> = select::Select<
29
    SelectPolicy<P>,
30
    Latency<Histo, S>,
31
    Delay<DelayPolicy, AsyncFilter<Latency<Histo, S>, PolicyPredicate<P>>>,
32
>;
33
34
/// A middleware that pre-emptively retries requests which have been outstanding
35
/// for longer than a given latency percentile.  If either of the original
36
/// future or the retry future completes, that value is used.
37
#[derive(Debug)]
38
pub struct Hedge<S, P>(Service<S, P>);
39
40
pin_project! {
41
    /// The [`Future`] returned by the [`Hedge`] service.
42
    ///
43
    /// [`Future`]: std::future::Future
44
    #[derive(Debug)]
45
    pub struct Future<S, Request>
46
    where
47
        S: tower_service::Service<Request>,
48
    {
49
        #[pin]
50
        inner: S::Future,
51
    }
52
}
53
54
/// A policy which describes which requests can be cloned and then whether those
55
/// requests should be retried.
56
pub trait Policy<Request> {
57
    /// Called when the request is first received to determine if the request is retryable.
58
    fn clone_request(&self, req: &Request) -> Option<Request>;
59
60
    /// Called after the hedge timeout to determine if the hedge retry should be issued.
61
    fn can_retry(&self, req: &Request) -> bool;
62
}
63
64
// NOTE: these are pub only because they appear inside a Future<F>
65
66
#[doc(hidden)]
67
#[derive(Clone, Debug)]
68
pub struct PolicyPredicate<P>(P);
69
70
#[doc(hidden)]
71
#[derive(Debug)]
72
pub struct DelayPolicy {
73
    histo: Histo,
74
    latency_percentile: f32,
75
}
76
77
#[doc(hidden)]
78
#[derive(Debug)]
79
pub struct SelectPolicy<P> {
80
    policy: P,
81
    histo: Histo,
82
    min_data_points: u64,
83
}
84
85
impl<S, P> Hedge<S, P> {
86
    /// Create a new hedge middleware.
87
0
    pub fn new<Request>(
88
0
        service: S,
89
0
        policy: P,
90
0
        min_data_points: u64,
91
0
        latency_percentile: f32,
92
0
        period: Duration,
93
0
    ) -> Hedge<S, P>
94
0
    where
95
0
        S: tower_service::Service<Request> + Clone,
96
0
        S::Error: Into<crate::BoxError>,
97
0
        P: Policy<Request> + Clone,
98
    {
99
0
        let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
100
0
        Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
101
0
    }
102
103
    /// A hedge middleware with a prepopulated latency histogram.  This is usedful
104
    /// for integration tests.
105
0
    pub fn new_with_mock_latencies<Request>(
106
0
        service: S,
107
0
        policy: P,
108
0
        min_data_points: u64,
109
0
        latency_percentile: f32,
110
0
        period: Duration,
111
0
        latencies_ms: &[u64],
112
0
    ) -> Hedge<S, P>
113
0
    where
114
0
        S: tower_service::Service<Request> + Clone,
115
0
        S::Error: Into<crate::BoxError>,
116
0
        P: Policy<Request> + Clone,
117
    {
118
0
        let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
119
        {
120
0
            let mut locked = histo.lock().unwrap();
121
0
            for latency in latencies_ms.iter() {
122
0
                locked.read().record(*latency).unwrap();
123
0
            }
124
        }
125
0
        Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
126
0
    }
127
128
0
    fn new_with_histo<Request>(
129
0
        service: S,
130
0
        policy: P,
131
0
        min_data_points: u64,
132
0
        latency_percentile: f32,
133
0
        histo: Histo,
134
0
    ) -> Hedge<S, P>
135
0
    where
136
0
        S: tower_service::Service<Request> + Clone,
137
0
        S::Error: Into<crate::BoxError>,
138
0
        P: Policy<Request> + Clone,
139
    {
140
        // Clone the underlying service and wrap both copies in a middleware that
141
        // records the latencies in a rotating histogram.
142
0
        let recorded_a = Latency::new(histo.clone(), service.clone());
143
0
        let recorded_b = Latency::new(histo.clone(), service);
144
145
        // Check policy to see if the hedge request should be issued.
146
0
        let filtered = AsyncFilter::new(recorded_b, PolicyPredicate(policy.clone()));
147
148
        // Delay the second request by a percentile of the recorded request latency
149
        // histogram.
150
0
        let delay_policy = DelayPolicy {
151
0
            histo: histo.clone(),
152
0
            latency_percentile,
153
0
        };
154
0
        let delayed = Delay::new(delay_policy, filtered);
155
156
        // If the request is retryable, issue two requests -- the second one delayed
157
        // by a latency percentile.  Use the first result to complete.
158
0
        let select_policy = SelectPolicy {
159
0
            policy,
160
0
            histo,
161
0
            min_data_points,
162
0
        };
163
0
        Hedge(Select::new(select_policy, recorded_a, delayed))
164
0
    }
165
}
166
167
impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
168
where
169
    S: tower_service::Service<Request> + Clone,
170
    S::Error: Into<crate::BoxError>,
171
    P: Policy<Request> + Clone,
172
{
173
    type Response = S::Response;
174
    type Error = crate::BoxError;
175
    type Future = Future<Service<S, P>, Request>;
176
177
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178
0
        self.0.poll_ready(cx)
179
0
    }
180
181
0
    fn call(&mut self, request: Request) -> Self::Future {
182
0
        Future {
183
0
            inner: self.0.call(request),
184
0
        }
185
0
    }
186
}
187
188
impl<S, Request> std::future::Future for Future<S, Request>
189
where
190
    S: tower_service::Service<Request>,
191
    S::Error: Into<crate::BoxError>,
192
{
193
    type Output = Result<S::Response, crate::BoxError>;
194
195
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196
0
        self.project().inner.poll(cx).map_err(Into::into)
197
0
    }
198
}
199
200
// TODO: Remove when Duration::as_millis() becomes stable.
201
const NANOS_PER_MILLI: u32 = 1_000_000;
202
const MILLIS_PER_SEC: u64 = 1_000;
203
0
fn millis(duration: Duration) -> u64 {
204
    // Round up.
205
0
    let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
206
0
    duration
207
0
        .as_secs()
208
0
        .saturating_mul(MILLIS_PER_SEC)
209
0
        .saturating_add(u64::from(millis))
210
0
}
211
212
impl latency::Record for Histo {
213
0
    fn record(&mut self, latency: Duration) {
214
0
        let mut locked = self.lock().unwrap();
215
0
        locked.write().record(millis(latency)).unwrap_or_else(|e| {
216
0
            error!("Failed to write to hedge histogram: {:?}", e);
217
0
        })
218
0
    }
219
}
220
221
impl<P, Request> crate::filter::AsyncPredicate<Request> for PolicyPredicate<P>
222
where
223
    P: Policy<Request>,
224
{
225
    type Future = future::Either<
226
        future::Ready<Result<Request, crate::BoxError>>,
227
        future::Pending<Result<Request, crate::BoxError>>,
228
    >;
229
    type Request = Request;
230
231
0
    fn check(&mut self, request: Request) -> Self::Future {
232
0
        if self.0.can_retry(&request) {
233
0
            future::Either::Left(future::ready(Ok(request)))
234
        } else {
235
            // If the hedge retry should not be issued, we simply want to wait
236
            // for the result of the original request.  Therefore we don't want
237
            // to return an error here.  Instead, we use future::pending to ensure
238
            // that the original request wins the select.
239
0
            future::Either::Right(future::pending())
240
        }
241
0
    }
242
}
243
244
impl<Request> delay::Policy<Request> for DelayPolicy {
245
0
    fn delay(&self, _req: &Request) -> Duration {
246
0
        let mut locked = self.histo.lock().unwrap();
247
0
        let millis = locked
248
0
            .read()
249
0
            .value_at_quantile(self.latency_percentile.into());
250
0
        Duration::from_millis(millis)
251
0
    }
252
}
253
254
impl<P, Request> select::Policy<Request> for SelectPolicy<P>
255
where
256
    P: Policy<Request>,
257
{
258
0
    fn clone_request(&self, req: &Request) -> Option<Request> {
259
0
        self.policy.clone_request(req).filter(|_| {
260
0
            let mut locked = self.histo.lock().unwrap();
261
            // Do not attempt a retry if there are insufficiently many data
262
            // points in the histogram.
263
0
            locked.read().len() >= self.min_data_points
264
0
        })
265
0
    }
266
}