Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.2/src/hedge/delay.rs
Line
Count
Source
1
use futures_util::ready;
2
use pin_project_lite::pin_project;
3
use std::time::Duration;
4
use std::{
5
    future::Future,
6
    pin::Pin,
7
    task::{Context, Poll},
8
};
9
use tower_service::Service;
10
11
use crate::util::Oneshot;
12
13
/// A policy which specifies how long each request should be delayed for.
14
pub trait Policy<Request> {
15
    fn delay(&self, req: &Request) -> Duration;
16
}
17
18
/// A middleware which delays sending the request to the underlying service
19
/// for an amount of time specified by the policy.
20
#[derive(Debug)]
21
pub struct Delay<P, S> {
22
    policy: P,
23
    service: S,
24
}
25
26
pin_project! {
27
    #[derive(Debug)]
28
    pub struct ResponseFuture<Request, S>
29
    where
30
        S: Service<Request>,
31
    {
32
        service: Option<S>,
33
        #[pin]
34
        state: State<Request, Oneshot<S, Request>>,
35
    }
36
}
37
38
pin_project! {
39
    #[project = StateProj]
40
    #[derive(Debug)]
41
    enum State<Request, F> {
42
        Delaying {
43
            #[pin]
44
            delay: tokio::time::Sleep,
45
            req: Option<Request>,
46
        },
47
        Called {
48
            #[pin]
49
            fut: F,
50
        },
51
    }
52
}
53
54
impl<Request, F> State<Request, F> {
55
0
    fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
56
0
        Self::Delaying { delay, req }
57
0
    }
58
59
0
    fn called(fut: F) -> Self {
60
0
        Self::Called { fut }
61
0
    }
62
}
63
64
impl<P, S> Delay<P, S> {
65
0
    pub const fn new<Request>(policy: P, service: S) -> Self
66
0
    where
67
0
        P: Policy<Request>,
68
0
        S: Service<Request> + Clone,
69
0
        S::Error: Into<crate::BoxError>,
70
    {
71
0
        Delay { policy, service }
72
0
    }
73
}
74
75
impl<Request, P, S> Service<Request> for Delay<P, S>
76
where
77
    P: Policy<Request>,
78
    S: Service<Request> + Clone,
79
    S::Error: Into<crate::BoxError>,
80
{
81
    type Response = S::Response;
82
    type Error = crate::BoxError;
83
    type Future = ResponseFuture<Request, S>;
84
85
0
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86
        // Calling self.service.poll_ready would reserve a slot for the delayed request,
87
        // potentially well in advance of actually making it.  Instead, signal readiness here and
88
        // treat the service as a Oneshot in the future.
89
0
        Poll::Ready(Ok(()))
90
0
    }
91
92
0
    fn call(&mut self, request: Request) -> Self::Future {
93
0
        let delay = self.policy.delay(&request);
94
0
        ResponseFuture {
95
0
            service: Some(self.service.clone()),
96
0
            state: State::delaying(tokio::time::sleep(delay), Some(request)),
97
0
        }
98
0
    }
99
}
100
101
impl<Request, S, T, E> Future for ResponseFuture<Request, S>
102
where
103
    E: Into<crate::BoxError>,
104
    S: Service<Request, Response = T, Error = E>,
105
{
106
    type Output = Result<T, crate::BoxError>;
107
108
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109
0
        let mut this = self.project();
110
111
        loop {
112
0
            match this.state.as_mut().project() {
113
0
                StateProj::Delaying { delay, req } => {
114
0
                    ready!(delay.poll(cx));
115
0
                    let req = req.take().expect("Missing request in delay");
116
0
                    let svc = this.service.take().expect("Missing service in delay");
117
0
                    let fut = Oneshot::new(svc, req);
118
0
                    this.state.set(State::called(fut));
119
                }
120
0
                StateProj::Called { fut } => {
121
0
                    return fut.poll(cx).map_err(Into::into);
122
                }
123
            };
124
        }
125
0
    }
126
}