/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.13.0/src/server/grpc.rs
Line | Count | Source |
1 | | use crate::codec::compression::{ |
2 | | CompressionEncoding, EnabledCompressionEncodings, SingleMessageCompressionOverride, |
3 | | }; |
4 | | use crate::codec::EncodeBody; |
5 | | use crate::metadata::GRPC_CONTENT_TYPE; |
6 | | use crate::{ |
7 | | body::Body, |
8 | | codec::{Codec, Streaming}, |
9 | | server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, |
10 | | Request, Status, |
11 | | }; |
12 | | use http_body::Body as HttpBody; |
13 | | use std::{fmt, pin::pin}; |
14 | | use tokio_stream::{Stream, StreamExt}; |
15 | | |
16 | | macro_rules! t { |
17 | | ($result:expr) => { |
18 | | match $result { |
19 | | Ok(value) => value, |
20 | | Err(status) => return status.into_http(), |
21 | | } |
22 | | }; |
23 | | } |
24 | | |
25 | | /// A gRPC Server handler. |
26 | | /// |
27 | | /// This will wrap some inner [`Codec`] and provide utilities to handle |
28 | | /// inbound unary, client side streaming, server side streaming, and |
29 | | /// bi-directional streaming. |
30 | | /// |
31 | | /// Each request handler method accepts some service that implements the |
32 | | /// corresponding service trait and a http request that contains some body that |
33 | | /// implements some [`Body`]. |
34 | | pub struct Grpc<T> { |
35 | | codec: T, |
36 | | /// Which compression encodings does the server accept for requests? |
37 | | accept_compression_encodings: EnabledCompressionEncodings, |
38 | | /// Which compression encodings might the server use for responses. |
39 | | send_compression_encodings: EnabledCompressionEncodings, |
40 | | /// Limits the maximum size of a decoded message. |
41 | | max_decoding_message_size: Option<usize>, |
42 | | /// Limits the maximum size of an encoded message. |
43 | | max_encoding_message_size: Option<usize>, |
44 | | } |
45 | | |
46 | | impl<T> Grpc<T> |
47 | | where |
48 | | T: Codec, |
49 | | { |
50 | | /// Creates a new gRPC server with the provided [`Codec`]. |
51 | 0 | pub fn new(codec: T) -> Self { |
52 | 0 | Self { |
53 | 0 | codec, |
54 | 0 | accept_compression_encodings: EnabledCompressionEncodings::default(), |
55 | 0 | send_compression_encodings: EnabledCompressionEncodings::default(), |
56 | 0 | max_decoding_message_size: None, |
57 | 0 | max_encoding_message_size: None, |
58 | 0 | } |
59 | 0 | } |
60 | | |
61 | | /// Enable accepting compressed requests. |
62 | | /// |
63 | | /// If a request with an unsupported encoding is received the server will respond with |
64 | | /// [`Code::UnUnimplemented`](crate::Code). |
65 | | /// |
66 | | /// # Example |
67 | | /// |
68 | | /// The most common way of using this is through a server generated by tonic-build: |
69 | | /// |
70 | | /// ```rust |
71 | | /// # enum CompressionEncoding { Gzip } |
72 | | /// # struct Svc; |
73 | | /// # struct ExampleServer<T>(T); |
74 | | /// # impl<T> ExampleServer<T> { |
75 | | /// # fn new(svc: T) -> Self { Self(svc) } |
76 | | /// # fn accept_compressed(self, _: CompressionEncoding) -> Self { self } |
77 | | /// # } |
78 | | /// # #[tonic::async_trait] |
79 | | /// # trait Example {} |
80 | | /// |
81 | | /// #[tonic::async_trait] |
82 | | /// impl Example for Svc { |
83 | | /// // ... |
84 | | /// } |
85 | | /// |
86 | | /// let service = ExampleServer::new(Svc).accept_compressed(CompressionEncoding::Gzip); |
87 | | /// ``` |
88 | 0 | pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { |
89 | 0 | self.accept_compression_encodings.enable(encoding); |
90 | 0 | self |
91 | 0 | } |
92 | | |
93 | | /// Enable sending compressed responses. |
94 | | /// |
95 | | /// Requires the client to also support receiving compressed responses. |
96 | | /// |
97 | | /// # Example |
98 | | /// |
99 | | /// The most common way of using this is through a server generated by tonic-build: |
100 | | /// |
101 | | /// ```rust |
102 | | /// # enum CompressionEncoding { Gzip } |
103 | | /// # struct Svc; |
104 | | /// # struct ExampleServer<T>(T); |
105 | | /// # impl<T> ExampleServer<T> { |
106 | | /// # fn new(svc: T) -> Self { Self(svc) } |
107 | | /// # fn send_compressed(self, _: CompressionEncoding) -> Self { self } |
108 | | /// # } |
109 | | /// # #[tonic::async_trait] |
110 | | /// # trait Example {} |
111 | | /// |
112 | | /// #[tonic::async_trait] |
113 | | /// impl Example for Svc { |
114 | | /// // ... |
115 | | /// } |
116 | | /// |
117 | | /// let service = ExampleServer::new(Svc).send_compressed(CompressionEncoding::Gzip); |
118 | | /// ``` |
119 | 0 | pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { |
120 | 0 | self.send_compression_encodings.enable(encoding); |
121 | 0 | self |
122 | 0 | } |
123 | | |
124 | | /// Limits the maximum size of a decoded message. |
125 | | /// |
126 | | /// # Example |
127 | | /// |
128 | | /// The most common way of using this is through a server generated by tonic-build: |
129 | | /// |
130 | | /// ```rust |
131 | | /// # struct Svc; |
132 | | /// # struct ExampleServer<T>(T); |
133 | | /// # impl<T> ExampleServer<T> { |
134 | | /// # fn new(svc: T) -> Self { Self(svc) } |
135 | | /// # fn max_decoding_message_size(self, _: usize) -> Self { self } |
136 | | /// # } |
137 | | /// # #[tonic::async_trait] |
138 | | /// # trait Example {} |
139 | | /// |
140 | | /// #[tonic::async_trait] |
141 | | /// impl Example for Svc { |
142 | | /// // ... |
143 | | /// } |
144 | | /// |
145 | | /// // Set the limit to 2MB, Defaults to 4MB. |
146 | | /// let limit = 2 * 1024 * 1024; |
147 | | /// let service = ExampleServer::new(Svc).max_decoding_message_size(limit); |
148 | | /// ``` |
149 | 0 | pub fn max_decoding_message_size(mut self, limit: usize) -> Self { |
150 | 0 | self.max_decoding_message_size = Some(limit); |
151 | 0 | self |
152 | 0 | } |
153 | | |
154 | | /// Limits the maximum size of a encoded message. |
155 | | /// |
156 | | /// # Example |
157 | | /// |
158 | | /// The most common way of using this is through a server generated by tonic-build: |
159 | | /// |
160 | | /// ```rust |
161 | | /// # struct Svc; |
162 | | /// # struct ExampleServer<T>(T); |
163 | | /// # impl<T> ExampleServer<T> { |
164 | | /// # fn new(svc: T) -> Self { Self(svc) } |
165 | | /// # fn max_encoding_message_size(self, _: usize) -> Self { self } |
166 | | /// # } |
167 | | /// # #[tonic::async_trait] |
168 | | /// # trait Example {} |
169 | | /// |
170 | | /// #[tonic::async_trait] |
171 | | /// impl Example for Svc { |
172 | | /// // ... |
173 | | /// } |
174 | | /// |
175 | | /// // Set the limit to 2MB, Defaults to 4MB. |
176 | | /// let limit = 2 * 1024 * 1024; |
177 | | /// let service = ExampleServer::new(Svc).max_encoding_message_size(limit); |
178 | | /// ``` |
179 | 0 | pub fn max_encoding_message_size(mut self, limit: usize) -> Self { |
180 | 0 | self.max_encoding_message_size = Some(limit); |
181 | 0 | self |
182 | 0 | } |
183 | | |
184 | | #[doc(hidden)] |
185 | 0 | pub fn apply_compression_config( |
186 | 0 | mut self, |
187 | 0 | accept_encodings: EnabledCompressionEncodings, |
188 | 0 | send_encodings: EnabledCompressionEncodings, |
189 | 0 | ) -> Self { |
190 | 0 | for &encoding in CompressionEncoding::ENCODINGS { |
191 | 0 | if accept_encodings.is_enabled(encoding) { |
192 | 0 | self = self.accept_compressed(encoding); |
193 | 0 | } |
194 | 0 | if send_encodings.is_enabled(encoding) { |
195 | 0 | self = self.send_compressed(encoding); |
196 | 0 | } |
197 | | } |
198 | | |
199 | 0 | self |
200 | 0 | } |
201 | | |
202 | | #[doc(hidden)] |
203 | 0 | pub fn apply_max_message_size_config( |
204 | 0 | mut self, |
205 | 0 | max_decoding_message_size: Option<usize>, |
206 | 0 | max_encoding_message_size: Option<usize>, |
207 | 0 | ) -> Self { |
208 | 0 | if let Some(limit) = max_decoding_message_size { |
209 | 0 | self = self.max_decoding_message_size(limit); |
210 | 0 | } |
211 | 0 | if let Some(limit) = max_encoding_message_size { |
212 | 0 | self = self.max_encoding_message_size(limit); |
213 | 0 | } |
214 | | |
215 | 0 | self |
216 | 0 | } |
217 | | |
218 | | /// Handle a single unary gRPC request. |
219 | 0 | pub async fn unary<S, B>( |
220 | 0 | &mut self, |
221 | 0 | mut service: S, |
222 | 0 | req: http::Request<B>, |
223 | 0 | ) -> http::Response<Body> |
224 | 0 | where |
225 | 0 | S: UnaryService<T::Decode, Response = T::Encode>, |
226 | 0 | B: HttpBody + Send + 'static, |
227 | 0 | B::Error: Into<crate::BoxError> + Send, |
228 | 0 | { |
229 | 0 | let accept_encoding = CompressionEncoding::from_accept_encoding_header( |
230 | 0 | req.headers(), |
231 | 0 | self.send_compression_encodings, |
232 | | ); |
233 | | |
234 | 0 | let request = match self.map_request_unary(req).await { |
235 | 0 | Ok(r) => r, |
236 | 0 | Err(status) => { |
237 | 0 | return self.map_response::<tokio_stream::Once<Result<T::Encode, Status>>>( |
238 | 0 | Err(status), |
239 | 0 | accept_encoding, |
240 | 0 | SingleMessageCompressionOverride::default(), |
241 | 0 | self.max_encoding_message_size, |
242 | | ); |
243 | | } |
244 | | }; |
245 | | |
246 | 0 | let response = service |
247 | 0 | .call(request) |
248 | 0 | .await |
249 | 0 | .map(|r| r.map(|m| tokio_stream::once(Ok(m)))); |
250 | | |
251 | 0 | let compression_override = compression_override_from_response(&response); |
252 | | |
253 | 0 | self.map_response( |
254 | 0 | response, |
255 | 0 | accept_encoding, |
256 | 0 | compression_override, |
257 | 0 | self.max_encoding_message_size, |
258 | | ) |
259 | 0 | } |
260 | | |
261 | | /// Handle a server side streaming request. |
262 | 0 | pub async fn server_streaming<S, B>( |
263 | 0 | &mut self, |
264 | 0 | mut service: S, |
265 | 0 | req: http::Request<B>, |
266 | 0 | ) -> http::Response<Body> |
267 | 0 | where |
268 | 0 | S: ServerStreamingService<T::Decode, Response = T::Encode>, |
269 | 0 | S::ResponseStream: Send + 'static, |
270 | 0 | B: HttpBody + Send + 'static, |
271 | 0 | B::Error: Into<crate::BoxError> + Send, |
272 | 0 | { |
273 | 0 | let accept_encoding = CompressionEncoding::from_accept_encoding_header( |
274 | 0 | req.headers(), |
275 | 0 | self.send_compression_encodings, |
276 | | ); |
277 | | |
278 | 0 | let request = match self.map_request_unary(req).await { |
279 | 0 | Ok(r) => r, |
280 | 0 | Err(status) => { |
281 | 0 | return self.map_response::<S::ResponseStream>( |
282 | 0 | Err(status), |
283 | 0 | accept_encoding, |
284 | 0 | SingleMessageCompressionOverride::default(), |
285 | 0 | self.max_encoding_message_size, |
286 | | ); |
287 | | } |
288 | | }; |
289 | | |
290 | 0 | let response = service.call(request).await; |
291 | | |
292 | 0 | self.map_response( |
293 | 0 | response, |
294 | 0 | accept_encoding, |
295 | | // disabling compression of individual stream items must be done on |
296 | | // the items themselves |
297 | 0 | SingleMessageCompressionOverride::default(), |
298 | 0 | self.max_encoding_message_size, |
299 | | ) |
300 | 0 | } |
301 | | |
302 | | /// Handle a client side streaming gRPC request. |
303 | 0 | pub async fn client_streaming<S, B>( |
304 | 0 | &mut self, |
305 | 0 | mut service: S, |
306 | 0 | req: http::Request<B>, |
307 | 0 | ) -> http::Response<Body> |
308 | 0 | where |
309 | 0 | S: ClientStreamingService<T::Decode, Response = T::Encode>, |
310 | 0 | B: HttpBody + Send + 'static, |
311 | 0 | B::Error: Into<crate::BoxError> + Send + 'static, |
312 | 0 | { |
313 | 0 | let accept_encoding = CompressionEncoding::from_accept_encoding_header( |
314 | 0 | req.headers(), |
315 | 0 | self.send_compression_encodings, |
316 | | ); |
317 | | |
318 | 0 | let request = t!(self.map_request_streaming(req)); |
319 | | |
320 | 0 | let response = service |
321 | 0 | .call(request) |
322 | 0 | .await |
323 | 0 | .map(|r| r.map(|m| tokio_stream::once(Ok(m)))); |
324 | | |
325 | 0 | let compression_override = compression_override_from_response(&response); |
326 | | |
327 | 0 | self.map_response( |
328 | 0 | response, |
329 | 0 | accept_encoding, |
330 | 0 | compression_override, |
331 | 0 | self.max_encoding_message_size, |
332 | | ) |
333 | 0 | } |
334 | | |
335 | | /// Handle a bi-directional streaming gRPC request. |
336 | 0 | pub async fn streaming<S, B>( |
337 | 0 | &mut self, |
338 | 0 | mut service: S, |
339 | 0 | req: http::Request<B>, |
340 | 0 | ) -> http::Response<Body> |
341 | 0 | where |
342 | 0 | S: StreamingService<T::Decode, Response = T::Encode> + Send, |
343 | 0 | S::ResponseStream: Send + 'static, |
344 | 0 | B: HttpBody + Send + 'static, |
345 | 0 | B::Error: Into<crate::BoxError> + Send, |
346 | 0 | { |
347 | 0 | let accept_encoding = CompressionEncoding::from_accept_encoding_header( |
348 | 0 | req.headers(), |
349 | 0 | self.send_compression_encodings, |
350 | | ); |
351 | | |
352 | 0 | let request = t!(self.map_request_streaming(req)); |
353 | | |
354 | 0 | let response = service.call(request).await; |
355 | | |
356 | 0 | self.map_response( |
357 | 0 | response, |
358 | 0 | accept_encoding, |
359 | 0 | SingleMessageCompressionOverride::default(), |
360 | 0 | self.max_encoding_message_size, |
361 | | ) |
362 | 0 | } |
363 | | |
364 | 0 | async fn map_request_unary<B>( |
365 | 0 | &mut self, |
366 | 0 | request: http::Request<B>, |
367 | 0 | ) -> Result<Request<T::Decode>, Status> |
368 | 0 | where |
369 | 0 | B: HttpBody + Send + 'static, |
370 | 0 | B::Error: Into<crate::BoxError> + Send, |
371 | 0 | { |
372 | 0 | let request_compression_encoding = self.request_encoding_if_supported(&request)?; |
373 | | |
374 | 0 | let (parts, body) = request.into_parts(); |
375 | | |
376 | 0 | let mut stream = pin!(Streaming::new_request( |
377 | 0 | self.codec.decoder(), |
378 | 0 | body, |
379 | 0 | request_compression_encoding, |
380 | 0 | self.max_decoding_message_size, |
381 | | )); |
382 | | |
383 | 0 | let message = stream |
384 | 0 | .try_next() |
385 | 0 | .await? |
386 | 0 | .ok_or_else(|| Status::internal("Missing request message."))?; |
387 | | |
388 | 0 | let mut req = Request::from_http_parts(parts, message); |
389 | | |
390 | 0 | if let Some(trailers) = stream.trailers().await? { |
391 | 0 | req.metadata_mut().merge(trailers); |
392 | 0 | } |
393 | | |
394 | 0 | Ok(req) |
395 | 0 | } |
396 | | |
397 | 0 | fn map_request_streaming<B>( |
398 | 0 | &mut self, |
399 | 0 | request: http::Request<B>, |
400 | 0 | ) -> Result<Request<Streaming<T::Decode>>, Status> |
401 | 0 | where |
402 | 0 | B: HttpBody + Send + 'static, |
403 | 0 | B::Error: Into<crate::BoxError> + Send, |
404 | | { |
405 | 0 | let encoding = self.request_encoding_if_supported(&request)?; |
406 | | |
407 | 0 | let request = request.map(|body| { |
408 | 0 | Streaming::new_request( |
409 | 0 | self.codec.decoder(), |
410 | 0 | body, |
411 | 0 | encoding, |
412 | 0 | self.max_decoding_message_size, |
413 | | ) |
414 | 0 | }); |
415 | | |
416 | 0 | Ok(Request::from_http(request)) |
417 | 0 | } |
418 | | |
419 | 0 | fn map_response<B>( |
420 | 0 | &mut self, |
421 | 0 | response: Result<crate::Response<B>, Status>, |
422 | 0 | accept_encoding: Option<CompressionEncoding>, |
423 | 0 | compression_override: SingleMessageCompressionOverride, |
424 | 0 | max_message_size: Option<usize>, |
425 | 0 | ) -> http::Response<Body> |
426 | 0 | where |
427 | 0 | B: Stream<Item = Result<T::Encode, Status>> + Send + 'static, |
428 | | { |
429 | 0 | let response = t!(response); |
430 | | |
431 | 0 | let (mut parts, body) = response.into_http().into_parts(); |
432 | | |
433 | | // Set the content type |
434 | 0 | parts |
435 | 0 | .headers |
436 | 0 | .insert(http::header::CONTENT_TYPE, GRPC_CONTENT_TYPE); |
437 | | |
438 | | #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))] |
439 | | if let Some(encoding) = accept_encoding { |
440 | | // Set the content encoding |
441 | | parts.headers.insert( |
442 | | crate::codec::compression::ENCODING_HEADER, |
443 | | encoding.into_header_value(), |
444 | | ); |
445 | | } |
446 | | |
447 | 0 | let body = EncodeBody::new_server( |
448 | 0 | self.codec.encoder(), |
449 | 0 | body, |
450 | 0 | accept_encoding, |
451 | 0 | compression_override, |
452 | 0 | max_message_size, |
453 | | ); |
454 | | |
455 | 0 | http::Response::from_parts(parts, Body::new(body)) |
456 | 0 | } |
457 | | |
458 | 0 | fn request_encoding_if_supported<B>( |
459 | 0 | &self, |
460 | 0 | request: &http::Request<B>, |
461 | 0 | ) -> Result<Option<CompressionEncoding>, Status> { |
462 | 0 | CompressionEncoding::from_encoding_header( |
463 | 0 | request.headers(), |
464 | 0 | self.accept_compression_encodings, |
465 | | ) |
466 | 0 | } |
467 | | } |
468 | | |
469 | | impl<T: fmt::Debug> fmt::Debug for Grpc<T> { |
470 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
471 | 0 | f.debug_struct("Grpc") |
472 | 0 | .field("codec", &self.codec) |
473 | 0 | .field( |
474 | 0 | "accept_compression_encodings", |
475 | 0 | &self.accept_compression_encodings, |
476 | 0 | ) |
477 | 0 | .field( |
478 | 0 | "send_compression_encodings", |
479 | 0 | &self.send_compression_encodings, |
480 | 0 | ) |
481 | 0 | .finish() |
482 | 0 | } |
483 | | } |
484 | | |
485 | 0 | fn compression_override_from_response<B, E>( |
486 | 0 | res: &Result<crate::Response<B>, E>, |
487 | 0 | ) -> SingleMessageCompressionOverride { |
488 | 0 | res.as_ref() |
489 | 0 | .ok() |
490 | 0 | .and_then(|response| { |
491 | 0 | response |
492 | 0 | .extensions() |
493 | 0 | .get::<SingleMessageCompressionOverride>() |
494 | 0 | .copied() |
495 | 0 | }) |
496 | 0 | .unwrap_or_default() |
497 | 0 | } |