Coverage Report

Created: 2026-04-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.3/src/steer/mod.rs
Line
Count
Source
1
//! This module provides functionality to aid managing routing requests between [`Service`]s.
2
//!
3
//! # Example
4
//!
5
//! [`Steer`] can for example be used to create a router, akin to what you might find in web
6
//! frameworks.
7
//!
8
//! Here, `GET /` will be sent to the `root` service, while all other requests go to `not_found`.
9
//!
10
//! ```rust
11
//! # use std::task::{Context, Poll, ready};
12
//! # use tower_service::Service;
13
//! # use tower::steer::Steer;
14
//! # use tower::service_fn;
15
//! # use tower::util::BoxService;
16
//! # use tower::ServiceExt;
17
//! # use std::convert::Infallible;
18
//! use http::{Request, Response, StatusCode, Method};
19
//!
20
//! # #[tokio::main]
21
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
22
//! // Service that responds to `GET /`
23
//! let root = service_fn(|req: Request<String>| async move {
24
//!     # assert_eq!(req.uri().path(), "/");
25
//!     let res = Response::new("Hello, World!".to_string());
26
//!     Ok::<_, Infallible>(res)
27
//! });
28
//! // We have to box the service so its type gets erased and we can put it in a `Vec` with other
29
//! // services
30
//! let root = BoxService::new(root);
31
//!
32
//! // Service that responds with `404 Not Found` to all requests
33
//! let not_found = service_fn(|req: Request<String>| async move {
34
//!     let res = Response::builder()
35
//!         .status(StatusCode::NOT_FOUND)
36
//!         .body(String::new())
37
//!         .expect("response is valid");
38
//!     Ok::<_, Infallible>(res)
39
//! });
40
//! // Box that as well
41
//! let not_found = BoxService::new(not_found);
42
//!
43
//! let mut svc = Steer::new(
44
//!     // All services we route between
45
//!     vec![root, not_found],
46
//!     // How we pick which service to send the request to
47
//!     |req: &Request<String>, _services: &[_]| {
48
//!         if req.method() == Method::GET && req.uri().path() == "/" {
49
//!             0 // Index of `root`
50
//!         } else {
51
//!             1 // Index of `not_found`
52
//!         }
53
//!     },
54
//! );
55
//!
56
//! // This request will get sent to `root`
57
//! let req = Request::get("/").body(String::new()).unwrap();
58
//! let res = svc.ready().await?.call(req).await?;
59
//! assert_eq!(res.into_body(), "Hello, World!");
60
//!
61
//! // This request will get sent to `not_found`
62
//! let req = Request::get("/does/not/exist").body(String::new()).unwrap();
63
//! let res = svc.ready().await?.call(req).await?;
64
//! assert_eq!(res.status(), StatusCode::NOT_FOUND);
65
//! assert_eq!(res.into_body(), "");
66
//! #
67
//! # Ok(())
68
//! # }
69
//! ```
70
use std::task::{Context, Poll};
71
use std::{collections::VecDeque, fmt, marker::PhantomData};
72
use tower_service::Service;
73
74
/// This is how callers of [`Steer`] tell it which `Service` a `Req` corresponds to.
75
pub trait Picker<S, Req> {
76
    /// Return an index into the iterator of `Service` passed to [`Steer::new`].
77
    fn pick(&mut self, r: &Req, services: &[S]) -> usize;
78
}
79
80
impl<S, F, Req> Picker<S, Req> for F
81
where
82
    F: Fn(&Req, &[S]) -> usize,
83
{
84
0
    fn pick(&mut self, r: &Req, services: &[S]) -> usize {
85
0
        self(r, services)
86
0
    }
87
}
88
89
/// [`Steer`] manages a list of [`Service`]s which all handle the same type of request.
90
///
91
/// An example use case is a sharded service.
92
/// It accepts new requests, then:
93
/// 1. Determines, via the provided [`Picker`], which [`Service`] the request corresponds to.
94
/// 2. Waits (in [`Service::poll_ready`]) for *all* services to be ready.
95
/// 3. Calls the correct [`Service`] with the request, and returns a future corresponding to the
96
///    call.
97
///
98
/// Note that [`Steer`] must wait for all services to be ready since it can't know ahead of time
99
/// which [`Service`] the next message will arrive for, and is unwilling to buffer items
100
/// indefinitely. This will cause head-of-line blocking unless paired with a [`Service`] that does
101
/// buffer items indefinitely, and thus always returns [`Poll::Ready`]. For example, wrapping each
102
/// component service with a [`Buffer`] with a high enough limit (the maximum number of concurrent
103
/// requests) will prevent head-of-line blocking in [`Steer`].
104
///
105
/// [`Buffer`]: crate::buffer::Buffer
106
pub struct Steer<S, F, Req> {
107
    router: F,
108
    services: Vec<S>,
109
    not_ready: VecDeque<usize>,
110
    _phantom: PhantomData<Req>,
111
}
112
113
impl<S, F, Req> Steer<S, F, Req> {
114
    /// Make a new [`Steer`] with a list of [`Service`]'s and a [`Picker`].
115
    ///
116
    /// Note: the order of the [`Service`]'s is significant for [`Picker::pick`]'s return value.
117
0
    pub fn new(services: impl IntoIterator<Item = S>, router: F) -> Self {
118
0
        let services: Vec<_> = services.into_iter().collect();
119
0
        let not_ready: VecDeque<_> = services.iter().enumerate().map(|(i, _)| i).collect();
120
0
        Self {
121
0
            router,
122
0
            services,
123
0
            not_ready,
124
0
            _phantom: PhantomData,
125
0
        }
126
0
    }
127
}
128
129
impl<S, Req, F> Service<Req> for Steer<S, F, Req>
130
where
131
    S: Service<Req>,
132
    F: Picker<S, Req>,
133
{
134
    type Response = S::Response;
135
    type Error = S::Error;
136
    type Future = S::Future;
137
138
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
139
        loop {
140
            // must wait for *all* services to be ready.
141
            // this will cause head-of-line blocking unless the underlying services are always ready.
142
0
            if self.not_ready.is_empty() {
143
0
                return Poll::Ready(Ok(()));
144
            } else {
145
0
                if self.services[self.not_ready[0]]
146
0
                    .poll_ready(cx)?
147
0
                    .is_pending()
148
                {
149
0
                    return Poll::Pending;
150
0
                }
151
152
0
                self.not_ready.pop_front();
153
            }
154
        }
155
0
    }
156
157
0
    fn call(&mut self, req: Req) -> Self::Future {
158
0
        assert!(
159
0
            self.not_ready.is_empty(),
160
            "Steer must wait for all services to be ready. Did you forget to call poll_ready()?"
161
        );
162
163
0
        let idx = self.router.pick(&req, &self.services[..]);
164
0
        let cl = &mut self.services[idx];
165
0
        self.not_ready.push_back(idx);
166
0
        cl.call(req)
167
0
    }
168
}
169
170
impl<S, F, Req> Clone for Steer<S, F, Req>
171
where
172
    S: Clone,
173
    F: Clone,
174
{
175
0
    fn clone(&self) -> Self {
176
0
        Self {
177
0
            router: self.router.clone(),
178
0
            services: self.services.clone(),
179
0
            not_ready: self.not_ready.clone(),
180
0
            _phantom: PhantomData,
181
0
        }
182
0
    }
183
}
184
185
impl<S, F, Req> fmt::Debug for Steer<S, F, Req>
186
where
187
    S: fmt::Debug,
188
    F: fmt::Debug,
189
{
190
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191
        let Self {
192
0
            router,
193
0
            services,
194
0
            not_ready,
195
0
            _phantom,
196
0
        } = self;
197
0
        f.debug_struct("Steer")
198
0
            .field("router", router)
199
0
            .field("services", services)
200
0
            .field("not_ready", not_ready)
201
0
            .finish()
202
0
    }
203
}