/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.12.3/src/request.rs
Line | Count | Source |
1 | | use crate::metadata::{MetadataMap, MetadataValue}; |
2 | | #[cfg(feature = "server")] |
3 | | use crate::transport::server::TcpConnectInfo; |
4 | | #[cfg(all(feature = "server", feature = "tls"))] |
5 | | use crate::transport::server::TlsConnectInfo; |
6 | | use http::Extensions; |
7 | | #[cfg(feature = "server")] |
8 | | use std::net::SocketAddr; |
9 | | #[cfg(all(feature = "server", feature = "tls"))] |
10 | | use std::sync::Arc; |
11 | | use std::time::Duration; |
12 | | #[cfg(all(feature = "server", feature = "tls"))] |
13 | | use tokio_rustls::rustls::pki_types::CertificateDer; |
14 | | use tokio_stream::Stream; |
15 | | |
16 | | /// A gRPC request and metadata from an RPC call. |
17 | | #[derive(Debug)] |
18 | | pub struct Request<T> { |
19 | | metadata: MetadataMap, |
20 | | message: T, |
21 | | extensions: Extensions, |
22 | | } |
23 | | |
24 | | /// Trait implemented by RPC request types. |
25 | | /// |
26 | | /// Types implementing this trait can be used as arguments to client RPC |
27 | | /// methods without explicitly wrapping them into `tonic::Request`s. The purpose |
28 | | /// is to make client calls slightly more convenient to write. |
29 | | /// |
30 | | /// Tonic's code generation and blanket implementations handle this for you, |
31 | | /// so it is not necessary to implement this trait directly. |
32 | | /// |
33 | | /// # Example |
34 | | /// |
35 | | /// Given the following gRPC method definition: |
36 | | /// ```proto |
37 | | /// rpc GetFeature(Point) returns (Feature) {} |
38 | | /// ``` |
39 | | /// |
40 | | /// we can call `get_feature` in two equivalent ways: |
41 | | /// ```rust |
42 | | /// # pub struct Point {} |
43 | | /// # pub struct Client {} |
44 | | /// # impl Client { |
45 | | /// # fn get_feature(&self, r: impl tonic::IntoRequest<Point>) {} |
46 | | /// # } |
47 | | /// # let client = Client {}; |
48 | | /// use tonic::Request; |
49 | | /// |
50 | | /// client.get_feature(Point {}); |
51 | | /// client.get_feature(Request::new(Point {})); |
52 | | /// ``` |
53 | | pub trait IntoRequest<T>: sealed::Sealed { |
54 | | /// Wrap the input message `T` in a `tonic::Request` |
55 | | fn into_request(self) -> Request<T>; |
56 | | } |
57 | | |
58 | | /// Trait implemented by RPC streaming request types. |
59 | | /// |
60 | | /// Types implementing this trait can be used as arguments to client streaming |
61 | | /// RPC methods without explicitly wrapping them into `tonic::Request`s. The |
62 | | /// purpose is to make client calls slightly more convenient to write. |
63 | | /// |
64 | | /// Tonic's code generation and blanket implementations handle this for you, |
65 | | /// so it is not necessary to implement this trait directly. |
66 | | /// |
67 | | /// # Example |
68 | | /// |
69 | | /// Given the following gRPC service method definition: |
70 | | /// ```proto |
71 | | /// rpc RecordRoute(stream Point) returns (RouteSummary) {} |
72 | | /// ``` |
73 | | /// we can call `record_route` in two equivalent ways: |
74 | | /// |
75 | | /// ```rust |
76 | | /// # #[derive(Clone)] |
77 | | /// # pub struct Point {}; |
78 | | /// # pub struct Client {}; |
79 | | /// # impl Client { |
80 | | /// # fn record_route(&self, r: impl tonic::IntoStreamingRequest<Message = Point>) {} |
81 | | /// # } |
82 | | /// # let client = Client {}; |
83 | | /// use tonic::Request; |
84 | | /// |
85 | | /// let messages = vec![Point {}, Point {}]; |
86 | | /// |
87 | | /// client.record_route(Request::new(tokio_stream::iter(messages.clone()))); |
88 | | /// client.record_route(tokio_stream::iter(messages)); |
89 | | /// ``` |
90 | | pub trait IntoStreamingRequest: sealed::Sealed { |
91 | | /// The RPC request stream type |
92 | | type Stream: Stream<Item = Self::Message> + Send + 'static; |
93 | | |
94 | | /// The RPC request type |
95 | | type Message; |
96 | | |
97 | | /// Wrap the stream of messages in a `tonic::Request` |
98 | | fn into_streaming_request(self) -> Request<Self::Stream>; |
99 | | } |
100 | | |
101 | | impl<T> Request<T> { |
102 | | /// Create a new gRPC request. |
103 | | /// |
104 | | /// ```rust |
105 | | /// # use tonic::Request; |
106 | | /// # pub struct HelloRequest { |
107 | | /// # pub name: String, |
108 | | /// # } |
109 | | /// Request::new(HelloRequest { |
110 | | /// name: "Bob".into(), |
111 | | /// }); |
112 | | /// ``` |
113 | 0 | pub fn new(message: T) -> Self { |
114 | 0 | Request { |
115 | 0 | metadata: MetadataMap::new(), |
116 | 0 | message, |
117 | 0 | extensions: Extensions::new(), |
118 | 0 | } |
119 | 0 | } |
120 | | |
121 | | /// Get a reference to the message |
122 | 0 | pub fn get_ref(&self) -> &T { |
123 | 0 | &self.message |
124 | 0 | } |
125 | | |
126 | | /// Get a mutable reference to the message |
127 | 0 | pub fn get_mut(&mut self) -> &mut T { |
128 | 0 | &mut self.message |
129 | 0 | } |
130 | | |
131 | | /// Get a reference to the custom request metadata. |
132 | 0 | pub fn metadata(&self) -> &MetadataMap { |
133 | 0 | &self.metadata |
134 | 0 | } |
135 | | |
136 | | /// Get a mutable reference to the request metadata. |
137 | 0 | pub fn metadata_mut(&mut self) -> &mut MetadataMap { |
138 | 0 | &mut self.metadata |
139 | 0 | } |
140 | | |
141 | | /// Consumes `self`, returning the message |
142 | 0 | pub fn into_inner(self) -> T { |
143 | 0 | self.message |
144 | 0 | } |
145 | | |
146 | | /// Consumes `self` returning the parts of the request. |
147 | 0 | pub fn into_parts(self) -> (MetadataMap, Extensions, T) { |
148 | 0 | (self.metadata, self.extensions, self.message) |
149 | 0 | } |
150 | | |
151 | | /// Create a new gRPC request from metadata, extensions and message. |
152 | 0 | pub fn from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self { |
153 | 0 | Self { |
154 | 0 | metadata, |
155 | 0 | extensions, |
156 | 0 | message, |
157 | 0 | } |
158 | 0 | } |
159 | | |
160 | 0 | pub(crate) fn from_http_parts(parts: http::request::Parts, message: T) -> Self { |
161 | 0 | Request { |
162 | 0 | metadata: MetadataMap::from_headers(parts.headers), |
163 | 0 | message, |
164 | 0 | extensions: parts.extensions, |
165 | 0 | } |
166 | 0 | } |
167 | | |
168 | | /// Convert an HTTP request to a gRPC request |
169 | 0 | pub fn from_http(http: http::Request<T>) -> Self { |
170 | 0 | let (parts, message) = http.into_parts(); |
171 | 0 | Request::from_http_parts(parts, message) |
172 | 0 | } |
173 | | |
174 | 0 | pub(crate) fn into_http( |
175 | 0 | self, |
176 | 0 | uri: http::Uri, |
177 | 0 | method: http::Method, |
178 | 0 | version: http::Version, |
179 | 0 | sanitize_headers: SanitizeHeaders, |
180 | 0 | ) -> http::Request<T> { |
181 | 0 | let mut request = http::Request::new(self.message); |
182 | | |
183 | 0 | *request.version_mut() = version; |
184 | 0 | *request.method_mut() = method; |
185 | 0 | *request.uri_mut() = uri; |
186 | 0 | *request.headers_mut() = match sanitize_headers { |
187 | 0 | SanitizeHeaders::Yes => self.metadata.into_sanitized_headers(), |
188 | 0 | SanitizeHeaders::No => self.metadata.into_headers(), |
189 | | }; |
190 | 0 | *request.extensions_mut() = self.extensions; |
191 | | |
192 | 0 | request |
193 | 0 | } |
194 | | |
195 | | #[doc(hidden)] |
196 | 0 | pub fn map<F, U>(self, f: F) -> Request<U> |
197 | 0 | where |
198 | 0 | F: FnOnce(T) -> U, |
199 | | { |
200 | 0 | let message = f(self.message); |
201 | | |
202 | 0 | Request { |
203 | 0 | metadata: self.metadata, |
204 | 0 | message, |
205 | 0 | extensions: self.extensions, |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | | /// Get the local address of this connection. |
210 | | /// |
211 | | /// This will return `None` if the `IO` type used |
212 | | /// does not implement `Connected` or when using a unix domain socket. |
213 | | /// This currently only works on the server side. |
214 | | #[cfg(feature = "server")] |
215 | 0 | pub fn local_addr(&self) -> Option<SocketAddr> { |
216 | 0 | let addr = self |
217 | 0 | .extensions() |
218 | 0 | .get::<TcpConnectInfo>() |
219 | 0 | .and_then(|i| i.local_addr()); |
220 | | |
221 | | #[cfg(feature = "tls")] |
222 | | let addr = addr.or_else(|| { |
223 | | self.extensions() |
224 | | .get::<TlsConnectInfo<TcpConnectInfo>>() |
225 | | .and_then(|i| i.get_ref().local_addr()) |
226 | | }); |
227 | | |
228 | 0 | addr |
229 | 0 | } |
230 | | |
231 | | /// Get the remote address of this connection. |
232 | | /// |
233 | | /// This will return `None` if the `IO` type used |
234 | | /// does not implement `Connected` or when using a unix domain socket. |
235 | | /// This currently only works on the server side. |
236 | | #[cfg(feature = "server")] |
237 | 0 | pub fn remote_addr(&self) -> Option<SocketAddr> { |
238 | 0 | let addr = self |
239 | 0 | .extensions() |
240 | 0 | .get::<TcpConnectInfo>() |
241 | 0 | .and_then(|i| i.remote_addr()); |
242 | | |
243 | | #[cfg(feature = "tls")] |
244 | | let addr = addr.or_else(|| { |
245 | | self.extensions() |
246 | | .get::<TlsConnectInfo<TcpConnectInfo>>() |
247 | | .and_then(|i| i.get_ref().remote_addr()) |
248 | | }); |
249 | | |
250 | 0 | addr |
251 | 0 | } |
252 | | |
253 | | /// Get the peer certificates of the connected client. |
254 | | /// |
255 | | /// This is used to fetch the certificates from the TLS session |
256 | | /// and is mostly used for mTLS. This currently only returns |
257 | | /// `Some` on the server side of the `transport` server with |
258 | | /// TLS enabled connections. |
259 | | #[cfg(all(feature = "server", feature = "tls"))] |
260 | | pub fn peer_certs(&self) -> Option<Arc<Vec<CertificateDer<'static>>>> { |
261 | | self.extensions() |
262 | | .get::<TlsConnectInfo<TcpConnectInfo>>() |
263 | | .and_then(|i| i.peer_certs()) |
264 | | } |
265 | | |
266 | | /// Set the max duration the request is allowed to take. |
267 | | /// |
268 | | /// Requires the server to support the `grpc-timeout` metadata, which Tonic does. |
269 | | /// |
270 | | /// The duration will be formatted according to [the spec] and use the most precise unit |
271 | | /// possible. |
272 | | /// |
273 | | /// Example: |
274 | | /// |
275 | | /// ```rust |
276 | | /// use std::time::Duration; |
277 | | /// use tonic::Request; |
278 | | /// |
279 | | /// let mut request = Request::new(()); |
280 | | /// |
281 | | /// request.set_timeout(Duration::from_secs(30)); |
282 | | /// |
283 | | /// let value = request.metadata().get("grpc-timeout").unwrap(); |
284 | | /// |
285 | | /// assert_eq!( |
286 | | /// value, |
287 | | /// // equivalent to 30 seconds |
288 | | /// "30000000u" |
289 | | /// ); |
290 | | /// ``` |
291 | | /// |
292 | | /// [the spec]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md |
293 | 0 | pub fn set_timeout(&mut self, deadline: Duration) { |
294 | 0 | let value: MetadataValue<_> = duration_to_grpc_timeout(deadline).parse().unwrap(); |
295 | 0 | self.metadata_mut() |
296 | 0 | .insert(crate::metadata::GRPC_TIMEOUT_HEADER, value); |
297 | 0 | } |
298 | | |
299 | | /// Returns a reference to the associated extensions. |
300 | 0 | pub fn extensions(&self) -> &Extensions { |
301 | 0 | &self.extensions |
302 | 0 | } |
303 | | |
304 | | /// Returns a mutable reference to the associated extensions. |
305 | | /// |
306 | | /// # Example |
307 | | /// |
308 | | /// Extensions can be set in interceptors: |
309 | | /// |
310 | | /// ```no_run |
311 | | /// use tonic::{Request, service::interceptor}; |
312 | | /// |
313 | | /// #[derive(Clone)] // Extensions must be Clone |
314 | | /// struct MyExtension { |
315 | | /// some_piece_of_data: String, |
316 | | /// } |
317 | | /// |
318 | | /// interceptor(|mut request: Request<()>| { |
319 | | /// request.extensions_mut().insert(MyExtension { |
320 | | /// some_piece_of_data: "foo".to_string(), |
321 | | /// }); |
322 | | /// |
323 | | /// Ok(request) |
324 | | /// }); |
325 | | /// ``` |
326 | | /// |
327 | | /// And picked up by RPCs: |
328 | | /// |
329 | | /// ```no_run |
330 | | /// use tonic::{async_trait, Status, Request, Response}; |
331 | | /// # |
332 | | /// # struct Output {} |
333 | | /// # struct Input; |
334 | | /// # struct MyService; |
335 | | /// # struct MyExtension; |
336 | | /// # #[async_trait] |
337 | | /// # trait TestService { |
338 | | /// # async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status>; |
339 | | /// # } |
340 | | /// |
341 | | /// #[async_trait] |
342 | | /// impl TestService for MyService { |
343 | | /// async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status> { |
344 | | /// let value: &MyExtension = req.extensions().get::<MyExtension>().unwrap(); |
345 | | /// |
346 | | /// Ok(Response::new(Output {})) |
347 | | /// } |
348 | | /// } |
349 | | /// ``` |
350 | 0 | pub fn extensions_mut(&mut self) -> &mut Extensions { |
351 | 0 | &mut self.extensions |
352 | 0 | } |
353 | | } |
354 | | |
355 | | impl<T> IntoRequest<T> for T { |
356 | 0 | fn into_request(self) -> Request<Self> { |
357 | 0 | Request::new(self) |
358 | 0 | } |
359 | | } |
360 | | |
361 | | impl<T> IntoRequest<T> for Request<T> { |
362 | 0 | fn into_request(self) -> Request<T> { |
363 | 0 | self |
364 | 0 | } |
365 | | } |
366 | | |
367 | | impl<T> IntoStreamingRequest for T |
368 | | where |
369 | | T: Stream + Send + 'static, |
370 | | { |
371 | | type Stream = T; |
372 | | type Message = T::Item; |
373 | | |
374 | 0 | fn into_streaming_request(self) -> Request<Self> { |
375 | 0 | Request::new(self) |
376 | 0 | } |
377 | | } |
378 | | |
379 | | impl<T> IntoStreamingRequest for Request<T> |
380 | | where |
381 | | T: Stream + Send + 'static, |
382 | | { |
383 | | type Stream = T; |
384 | | type Message = T::Item; |
385 | | |
386 | 0 | fn into_streaming_request(self) -> Self { |
387 | 0 | self |
388 | 0 | } |
389 | | } |
390 | | |
391 | | impl<T> sealed::Sealed for T {} |
392 | | |
393 | | mod sealed { |
394 | | pub trait Sealed {} |
395 | | } |
396 | | |
397 | 0 | fn duration_to_grpc_timeout(duration: Duration) -> String { |
398 | 0 | fn try_format<T: Into<u128>>( |
399 | 0 | duration: Duration, |
400 | 0 | unit: char, |
401 | 0 | convert: impl FnOnce(Duration) -> T, |
402 | 0 | ) -> Option<String> { |
403 | | // The gRPC spec specifies that the timeout most be at most 8 digits. So this is the largest a |
404 | | // value can be before we need to use a bigger unit. |
405 | 0 | let max_size: u128 = 99_999_999; // exactly 8 digits |
406 | | |
407 | 0 | let value = convert(duration).into(); |
408 | 0 | if value > max_size { |
409 | 0 | None |
410 | | } else { |
411 | 0 | Some(format!("{}{}", value, unit)) |
412 | | } |
413 | 0 | } Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u128, tonic::request::duration_to_grpc_timeout::{closure#0}>Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u128, tonic::request::duration_to_grpc_timeout::{closure#2}::{closure#0}>Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u128, tonic::request::duration_to_grpc_timeout::{closure#1}::{closure#0}>Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u64, tonic::request::duration_to_grpc_timeout::{closure#3}::{closure#0}>Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u64, tonic::request::duration_to_grpc_timeout::{closure#4}::{closure#0}>Unexecuted instantiation: tonic::request::duration_to_grpc_timeout::try_format::<u64, tonic::request::duration_to_grpc_timeout::{closure#5}::{closure#0}> |
414 | | |
415 | | // pick the most precise unit that is less than or equal to 8 digits as per the gRPC spec |
416 | 0 | try_format(duration, 'n', |d| d.as_nanos()) |
417 | 0 | .or_else(|| try_format(duration, 'u', |d| d.as_micros())) |
418 | 0 | .or_else(|| try_format(duration, 'm', |d| d.as_millis())) |
419 | 0 | .or_else(|| try_format(duration, 'S', |d| d.as_secs())) |
420 | 0 | .or_else(|| try_format(duration, 'M', |d| d.as_secs() / 60)) |
421 | 0 | .or_else(|| { |
422 | 0 | try_format(duration, 'H', |d| { |
423 | 0 | let minutes = d.as_secs() / 60; |
424 | 0 | minutes / 60 |
425 | 0 | }) |
426 | 0 | }) |
427 | | // duration has to be more than 11_415 years for this to happen |
428 | 0 | .expect("duration is unrealistically large") |
429 | 0 | } |
430 | | |
431 | | /// When converting a `tonic::Request` into a `http::Request` should reserved |
432 | | /// headers be removed? |
433 | | pub(crate) enum SanitizeHeaders { |
434 | | Yes, |
435 | | No, |
436 | | } |
437 | | |
438 | | #[cfg(test)] |
439 | | mod tests { |
440 | | use super::*; |
441 | | use crate::metadata::{MetadataKey, MetadataValue}; |
442 | | |
443 | | use http::Uri; |
444 | | |
445 | | #[test] |
446 | | fn reserved_headers_are_excluded() { |
447 | | let mut r = Request::new(1); |
448 | | |
449 | | for header in &MetadataMap::GRPC_RESERVED_HEADERS { |
450 | | r.metadata_mut().insert( |
451 | | MetadataKey::unchecked_from_header_name(header.clone()), |
452 | | MetadataValue::from_static("invalid"), |
453 | | ); |
454 | | } |
455 | | |
456 | | let http_request = r.into_http( |
457 | | Uri::default(), |
458 | | http::Method::POST, |
459 | | http::Version::HTTP_2, |
460 | | SanitizeHeaders::Yes, |
461 | | ); |
462 | | assert!(http_request.headers().is_empty()); |
463 | | } |
464 | | |
465 | | #[test] |
466 | | fn duration_to_grpc_timeout_less_than_second() { |
467 | | let timeout = Duration::from_millis(500); |
468 | | let value = duration_to_grpc_timeout(timeout); |
469 | | assert_eq!(value, format!("{}u", timeout.as_micros())); |
470 | | } |
471 | | |
472 | | #[test] |
473 | | fn duration_to_grpc_timeout_more_than_second() { |
474 | | let timeout = Duration::from_secs(30); |
475 | | let value = duration_to_grpc_timeout(timeout); |
476 | | assert_eq!(value, format!("{}u", timeout.as_micros())); |
477 | | } |
478 | | |
479 | | #[test] |
480 | | fn duration_to_grpc_timeout_a_very_long_time() { |
481 | | let one_hour = Duration::from_secs(60 * 60); |
482 | | let value = duration_to_grpc_timeout(one_hour); |
483 | | assert_eq!(value, format!("{}m", one_hour.as_millis())); |
484 | | } |
485 | | } |