/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tower-0.5.2/src/steer/mod.rs
Line | Count | Source |
1 | | //! This module provides functionality to aid managing routing requests between [`Service`]s. |
2 | | //! |
3 | | //! # Example |
4 | | //! |
5 | | //! [`Steer`] can for example be used to create a router, akin to what you might find in web |
6 | | //! frameworks. |
7 | | //! |
8 | | //! Here, `GET /` will be sent to the `root` service, while all other requests go to `not_found`. |
9 | | //! |
10 | | //! ```rust |
11 | | //! # use std::task::{Context, Poll}; |
12 | | //! # use tower_service::Service; |
13 | | //! # use futures_util::future::{ready, Ready, poll_fn}; |
14 | | //! # use tower::steer::Steer; |
15 | | //! # use tower::service_fn; |
16 | | //! # use tower::util::BoxService; |
17 | | //! # use tower::ServiceExt; |
18 | | //! # use std::convert::Infallible; |
19 | | //! use http::{Request, Response, StatusCode, Method}; |
20 | | //! |
21 | | //! # #[tokio::main] |
22 | | //! # async fn main() -> Result<(), Box<dyn std::error::Error>> { |
23 | | //! // Service that responds to `GET /` |
24 | | //! let root = service_fn(|req: Request<String>| async move { |
25 | | //! # assert_eq!(req.uri().path(), "/"); |
26 | | //! let res = Response::new("Hello, World!".to_string()); |
27 | | //! Ok::<_, Infallible>(res) |
28 | | //! }); |
29 | | //! // We have to box the service so its type gets erased and we can put it in a `Vec` with other |
30 | | //! // services |
31 | | //! let root = BoxService::new(root); |
32 | | //! |
33 | | //! // Service that responds with `404 Not Found` to all requests |
34 | | //! let not_found = service_fn(|req: Request<String>| async move { |
35 | | //! let res = Response::builder() |
36 | | //! .status(StatusCode::NOT_FOUND) |
37 | | //! .body(String::new()) |
38 | | //! .expect("response is valid"); |
39 | | //! Ok::<_, Infallible>(res) |
40 | | //! }); |
41 | | //! // Box that as well |
42 | | //! let not_found = BoxService::new(not_found); |
43 | | //! |
44 | | //! let mut svc = Steer::new( |
45 | | //! // All services we route between |
46 | | //! vec![root, not_found], |
47 | | //! // How we pick which service to send the request to |
48 | | //! |req: &Request<String>, _services: &[_]| { |
49 | | //! if req.method() == Method::GET && req.uri().path() == "/" { |
50 | | //! 0 // Index of `root` |
51 | | //! } else { |
52 | | //! 1 // Index of `not_found` |
53 | | //! } |
54 | | //! }, |
55 | | //! ); |
56 | | //! |
57 | | //! // This request will get sent to `root` |
58 | | //! let req = Request::get("/").body(String::new()).unwrap(); |
59 | | //! let res = svc.ready().await?.call(req).await?; |
60 | | //! assert_eq!(res.into_body(), "Hello, World!"); |
61 | | //! |
62 | | //! // This request will get sent to `not_found` |
63 | | //! let req = Request::get("/does/not/exist").body(String::new()).unwrap(); |
64 | | //! let res = svc.ready().await?.call(req).await?; |
65 | | //! assert_eq!(res.status(), StatusCode::NOT_FOUND); |
66 | | //! assert_eq!(res.into_body(), ""); |
67 | | //! # |
68 | | //! # Ok(()) |
69 | | //! # } |
70 | | //! ``` |
71 | | use std::task::{Context, Poll}; |
72 | | use std::{collections::VecDeque, fmt, marker::PhantomData}; |
73 | | use tower_service::Service; |
74 | | |
75 | | /// This is how callers of [`Steer`] tell it which `Service` a `Req` corresponds to. |
76 | | pub trait Picker<S, Req> { |
77 | | /// Return an index into the iterator of `Service` passed to [`Steer::new`]. |
78 | | fn pick(&mut self, r: &Req, services: &[S]) -> usize; |
79 | | } |
80 | | |
81 | | impl<S, F, Req> Picker<S, Req> for F |
82 | | where |
83 | | F: Fn(&Req, &[S]) -> usize, |
84 | | { |
85 | 0 | fn pick(&mut self, r: &Req, services: &[S]) -> usize { |
86 | 0 | self(r, services) |
87 | 0 | } |
88 | | } |
89 | | |
90 | | /// [`Steer`] manages a list of [`Service`]s which all handle the same type of request. |
91 | | /// |
92 | | /// An example use case is a sharded service. |
93 | | /// It accepts new requests, then: |
94 | | /// 1. Determines, via the provided [`Picker`], which [`Service`] the request corresponds to. |
95 | | /// 2. Waits (in [`Service::poll_ready`]) for *all* services to be ready. |
96 | | /// 3. Calls the correct [`Service`] with the request, and returns a future corresponding to the |
97 | | /// call. |
98 | | /// |
99 | | /// Note that [`Steer`] must wait for all services to be ready since it can't know ahead of time |
100 | | /// which [`Service`] the next message will arrive for, and is unwilling to buffer items |
101 | | /// indefinitely. This will cause head-of-line blocking unless paired with a [`Service`] that does |
102 | | /// buffer items indefinitely, and thus always returns [`Poll::Ready`]. For example, wrapping each |
103 | | /// component service with a [`Buffer`] with a high enough limit (the maximum number of concurrent |
104 | | /// requests) will prevent head-of-line blocking in [`Steer`]. |
105 | | /// |
106 | | /// [`Buffer`]: crate::buffer::Buffer |
107 | | pub struct Steer<S, F, Req> { |
108 | | router: F, |
109 | | services: Vec<S>, |
110 | | not_ready: VecDeque<usize>, |
111 | | _phantom: PhantomData<Req>, |
112 | | } |
113 | | |
114 | | impl<S, F, Req> Steer<S, F, Req> { |
115 | | /// Make a new [`Steer`] with a list of [`Service`]'s and a [`Picker`]. |
116 | | /// |
117 | | /// Note: the order of the [`Service`]'s is significant for [`Picker::pick`]'s return value. |
118 | 0 | pub fn new(services: impl IntoIterator<Item = S>, router: F) -> Self { |
119 | 0 | let services: Vec<_> = services.into_iter().collect(); |
120 | 0 | let not_ready: VecDeque<_> = services.iter().enumerate().map(|(i, _)| i).collect(); |
121 | 0 | Self { |
122 | 0 | router, |
123 | 0 | services, |
124 | 0 | not_ready, |
125 | 0 | _phantom: PhantomData, |
126 | 0 | } |
127 | 0 | } |
128 | | } |
129 | | |
130 | | impl<S, Req, F> Service<Req> for Steer<S, F, Req> |
131 | | where |
132 | | S: Service<Req>, |
133 | | F: Picker<S, Req>, |
134 | | { |
135 | | type Response = S::Response; |
136 | | type Error = S::Error; |
137 | | type Future = S::Future; |
138 | | |
139 | 0 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
140 | | loop { |
141 | | // must wait for *all* services to be ready. |
142 | | // this will cause head-of-line blocking unless the underlying services are always ready. |
143 | 0 | if self.not_ready.is_empty() { |
144 | 0 | return Poll::Ready(Ok(())); |
145 | | } else { |
146 | 0 | if self.services[self.not_ready[0]] |
147 | 0 | .poll_ready(cx)? |
148 | 0 | .is_pending() |
149 | | { |
150 | 0 | return Poll::Pending; |
151 | 0 | } |
152 | | |
153 | 0 | self.not_ready.pop_front(); |
154 | | } |
155 | | } |
156 | 0 | } |
157 | | |
158 | 0 | fn call(&mut self, req: Req) -> Self::Future { |
159 | 0 | assert!( |
160 | 0 | self.not_ready.is_empty(), |
161 | | "Steer must wait for all services to be ready. Did you forget to call poll_ready()?" |
162 | | ); |
163 | | |
164 | 0 | let idx = self.router.pick(&req, &self.services[..]); |
165 | 0 | let cl = &mut self.services[idx]; |
166 | 0 | self.not_ready.push_back(idx); |
167 | 0 | cl.call(req) |
168 | 0 | } |
169 | | } |
170 | | |
171 | | impl<S, F, Req> Clone for Steer<S, F, Req> |
172 | | where |
173 | | S: Clone, |
174 | | F: Clone, |
175 | | { |
176 | 0 | fn clone(&self) -> Self { |
177 | 0 | Self { |
178 | 0 | router: self.router.clone(), |
179 | 0 | services: self.services.clone(), |
180 | 0 | not_ready: self.not_ready.clone(), |
181 | 0 | _phantom: PhantomData, |
182 | 0 | } |
183 | 0 | } |
184 | | } |
185 | | |
186 | | impl<S, F, Req> fmt::Debug for Steer<S, F, Req> |
187 | | where |
188 | | S: fmt::Debug, |
189 | | F: fmt::Debug, |
190 | | { |
191 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
192 | | let Self { |
193 | 0 | router, |
194 | 0 | services, |
195 | 0 | not_ready, |
196 | 0 | _phantom, |
197 | 0 | } = self; |
198 | 0 | f.debug_struct("Steer") |
199 | 0 | .field("router", router) |
200 | 0 | .field("services", services) |
201 | 0 | .field("not_ready", not_ready) |
202 | 0 | .finish() |
203 | 0 | } |
204 | | } |