Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.13.0/src/server/grpc.rs
Line
Count
Source
1
use crate::codec::compression::{
2
    CompressionEncoding, EnabledCompressionEncodings, SingleMessageCompressionOverride,
3
};
4
use crate::codec::EncodeBody;
5
use crate::metadata::GRPC_CONTENT_TYPE;
6
use crate::{
7
    body::Body,
8
    codec::{Codec, Streaming},
9
    server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService},
10
    Request, Status,
11
};
12
use http_body::Body as HttpBody;
13
use std::{fmt, pin::pin};
14
use tokio_stream::{Stream, StreamExt};
15
16
macro_rules! t {
17
    ($result:expr) => {
18
        match $result {
19
            Ok(value) => value,
20
            Err(status) => return status.into_http(),
21
        }
22
    };
23
}
24
25
/// A gRPC Server handler.
26
///
27
/// This will wrap some inner [`Codec`] and provide utilities to handle
28
/// inbound unary, client side streaming, server side streaming, and
29
/// bi-directional streaming.
30
///
31
/// Each request handler method accepts some service that implements the
32
/// corresponding service trait and a http request that contains some body that
33
/// implements some [`Body`].
34
pub struct Grpc<T> {
35
    codec: T,
36
    /// Which compression encodings does the server accept for requests?
37
    accept_compression_encodings: EnabledCompressionEncodings,
38
    /// Which compression encodings might the server use for responses.
39
    send_compression_encodings: EnabledCompressionEncodings,
40
    /// Limits the maximum size of a decoded message.
41
    max_decoding_message_size: Option<usize>,
42
    /// Limits the maximum size of an encoded message.
43
    max_encoding_message_size: Option<usize>,
44
}
45
46
impl<T> Grpc<T>
47
where
48
    T: Codec,
49
{
50
    /// Creates a new gRPC server with the provided [`Codec`].
51
0
    pub fn new(codec: T) -> Self {
52
0
        Self {
53
0
            codec,
54
0
            accept_compression_encodings: EnabledCompressionEncodings::default(),
55
0
            send_compression_encodings: EnabledCompressionEncodings::default(),
56
0
            max_decoding_message_size: None,
57
0
            max_encoding_message_size: None,
58
0
        }
59
0
    }
60
61
    /// Enable accepting compressed requests.
62
    ///
63
    /// If a request with an unsupported encoding is received the server will respond with
64
    /// [`Code::UnUnimplemented`](crate::Code).
65
    ///
66
    /// # Example
67
    ///
68
    /// The most common way of using this is through a server generated by tonic-build:
69
    ///
70
    /// ```rust
71
    /// # enum CompressionEncoding { Gzip }
72
    /// # struct Svc;
73
    /// # struct ExampleServer<T>(T);
74
    /// # impl<T> ExampleServer<T> {
75
    /// #     fn new(svc: T) -> Self { Self(svc) }
76
    /// #     fn accept_compressed(self, _: CompressionEncoding) -> Self { self }
77
    /// # }
78
    /// # #[tonic::async_trait]
79
    /// # trait Example {}
80
    ///
81
    /// #[tonic::async_trait]
82
    /// impl Example for Svc {
83
    ///     // ...
84
    /// }
85
    ///
86
    /// let service = ExampleServer::new(Svc).accept_compressed(CompressionEncoding::Gzip);
87
    /// ```
88
0
    pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
89
0
        self.accept_compression_encodings.enable(encoding);
90
0
        self
91
0
    }
92
93
    /// Enable sending compressed responses.
94
    ///
95
    /// Requires the client to also support receiving compressed responses.
96
    ///
97
    /// # Example
98
    ///
99
    /// The most common way of using this is through a server generated by tonic-build:
100
    ///
101
    /// ```rust
102
    /// # enum CompressionEncoding { Gzip }
103
    /// # struct Svc;
104
    /// # struct ExampleServer<T>(T);
105
    /// # impl<T> ExampleServer<T> {
106
    /// #     fn new(svc: T) -> Self { Self(svc) }
107
    /// #     fn send_compressed(self, _: CompressionEncoding) -> Self { self }
108
    /// # }
109
    /// # #[tonic::async_trait]
110
    /// # trait Example {}
111
    ///
112
    /// #[tonic::async_trait]
113
    /// impl Example for Svc {
114
    ///     // ...
115
    /// }
116
    ///
117
    /// let service = ExampleServer::new(Svc).send_compressed(CompressionEncoding::Gzip);
118
    /// ```
119
0
    pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
120
0
        self.send_compression_encodings.enable(encoding);
121
0
        self
122
0
    }
123
124
    /// Limits the maximum size of a decoded message.
125
    ///
126
    /// # Example
127
    ///
128
    /// The most common way of using this is through a server generated by tonic-build:
129
    ///
130
    /// ```rust
131
    /// # struct Svc;
132
    /// # struct ExampleServer<T>(T);
133
    /// # impl<T> ExampleServer<T> {
134
    /// #     fn new(svc: T) -> Self { Self(svc) }
135
    /// #     fn max_decoding_message_size(self, _: usize) -> Self { self }
136
    /// # }
137
    /// # #[tonic::async_trait]
138
    /// # trait Example {}
139
    ///
140
    /// #[tonic::async_trait]
141
    /// impl Example for Svc {
142
    ///     // ...
143
    /// }
144
    ///
145
    /// // Set the limit to 2MB, Defaults to 4MB.
146
    /// let limit = 2 * 1024 * 1024;
147
    /// let service = ExampleServer::new(Svc).max_decoding_message_size(limit);
148
    /// ```
149
0
    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
150
0
        self.max_decoding_message_size = Some(limit);
151
0
        self
152
0
    }
153
154
    /// Limits the maximum size of a encoded message.
155
    ///
156
    /// # Example
157
    ///
158
    /// The most common way of using this is through a server generated by tonic-build:
159
    ///
160
    /// ```rust
161
    /// # struct Svc;
162
    /// # struct ExampleServer<T>(T);
163
    /// # impl<T> ExampleServer<T> {
164
    /// #     fn new(svc: T) -> Self { Self(svc) }
165
    /// #     fn max_encoding_message_size(self, _: usize) -> Self { self }
166
    /// # }
167
    /// # #[tonic::async_trait]
168
    /// # trait Example {}
169
    ///
170
    /// #[tonic::async_trait]
171
    /// impl Example for Svc {
172
    ///     // ...
173
    /// }
174
    ///
175
    /// // Set the limit to 2MB, Defaults to 4MB.
176
    /// let limit = 2 * 1024 * 1024;
177
    /// let service = ExampleServer::new(Svc).max_encoding_message_size(limit);
178
    /// ```
179
0
    pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
180
0
        self.max_encoding_message_size = Some(limit);
181
0
        self
182
0
    }
183
184
    #[doc(hidden)]
185
0
    pub fn apply_compression_config(
186
0
        mut self,
187
0
        accept_encodings: EnabledCompressionEncodings,
188
0
        send_encodings: EnabledCompressionEncodings,
189
0
    ) -> Self {
190
0
        for &encoding in CompressionEncoding::ENCODINGS {
191
0
            if accept_encodings.is_enabled(encoding) {
192
0
                self = self.accept_compressed(encoding);
193
0
            }
194
0
            if send_encodings.is_enabled(encoding) {
195
0
                self = self.send_compressed(encoding);
196
0
            }
197
        }
198
199
0
        self
200
0
    }
201
202
    #[doc(hidden)]
203
0
    pub fn apply_max_message_size_config(
204
0
        mut self,
205
0
        max_decoding_message_size: Option<usize>,
206
0
        max_encoding_message_size: Option<usize>,
207
0
    ) -> Self {
208
0
        if let Some(limit) = max_decoding_message_size {
209
0
            self = self.max_decoding_message_size(limit);
210
0
        }
211
0
        if let Some(limit) = max_encoding_message_size {
212
0
            self = self.max_encoding_message_size(limit);
213
0
        }
214
215
0
        self
216
0
    }
217
218
    /// Handle a single unary gRPC request.
219
0
    pub async fn unary<S, B>(
220
0
        &mut self,
221
0
        mut service: S,
222
0
        req: http::Request<B>,
223
0
    ) -> http::Response<Body>
224
0
    where
225
0
        S: UnaryService<T::Decode, Response = T::Encode>,
226
0
        B: HttpBody + Send + 'static,
227
0
        B::Error: Into<crate::BoxError> + Send,
228
0
    {
229
0
        let accept_encoding = CompressionEncoding::from_accept_encoding_header(
230
0
            req.headers(),
231
0
            self.send_compression_encodings,
232
        );
233
234
0
        let request = match self.map_request_unary(req).await {
235
0
            Ok(r) => r,
236
0
            Err(status) => {
237
0
                return self.map_response::<tokio_stream::Once<Result<T::Encode, Status>>>(
238
0
                    Err(status),
239
0
                    accept_encoding,
240
0
                    SingleMessageCompressionOverride::default(),
241
0
                    self.max_encoding_message_size,
242
                );
243
            }
244
        };
245
246
0
        let response = service
247
0
            .call(request)
248
0
            .await
249
0
            .map(|r| r.map(|m| tokio_stream::once(Ok(m))));
250
251
0
        let compression_override = compression_override_from_response(&response);
252
253
0
        self.map_response(
254
0
            response,
255
0
            accept_encoding,
256
0
            compression_override,
257
0
            self.max_encoding_message_size,
258
        )
259
0
    }
260
261
    /// Handle a server side streaming request.
262
0
    pub async fn server_streaming<S, B>(
263
0
        &mut self,
264
0
        mut service: S,
265
0
        req: http::Request<B>,
266
0
    ) -> http::Response<Body>
267
0
    where
268
0
        S: ServerStreamingService<T::Decode, Response = T::Encode>,
269
0
        S::ResponseStream: Send + 'static,
270
0
        B: HttpBody + Send + 'static,
271
0
        B::Error: Into<crate::BoxError> + Send,
272
0
    {
273
0
        let accept_encoding = CompressionEncoding::from_accept_encoding_header(
274
0
            req.headers(),
275
0
            self.send_compression_encodings,
276
        );
277
278
0
        let request = match self.map_request_unary(req).await {
279
0
            Ok(r) => r,
280
0
            Err(status) => {
281
0
                return self.map_response::<S::ResponseStream>(
282
0
                    Err(status),
283
0
                    accept_encoding,
284
0
                    SingleMessageCompressionOverride::default(),
285
0
                    self.max_encoding_message_size,
286
                );
287
            }
288
        };
289
290
0
        let response = service.call(request).await;
291
292
0
        self.map_response(
293
0
            response,
294
0
            accept_encoding,
295
            // disabling compression of individual stream items must be done on
296
            // the items themselves
297
0
            SingleMessageCompressionOverride::default(),
298
0
            self.max_encoding_message_size,
299
        )
300
0
    }
301
302
    /// Handle a client side streaming gRPC request.
303
0
    pub async fn client_streaming<S, B>(
304
0
        &mut self,
305
0
        mut service: S,
306
0
        req: http::Request<B>,
307
0
    ) -> http::Response<Body>
308
0
    where
309
0
        S: ClientStreamingService<T::Decode, Response = T::Encode>,
310
0
        B: HttpBody + Send + 'static,
311
0
        B::Error: Into<crate::BoxError> + Send + 'static,
312
0
    {
313
0
        let accept_encoding = CompressionEncoding::from_accept_encoding_header(
314
0
            req.headers(),
315
0
            self.send_compression_encodings,
316
        );
317
318
0
        let request = t!(self.map_request_streaming(req));
319
320
0
        let response = service
321
0
            .call(request)
322
0
            .await
323
0
            .map(|r| r.map(|m| tokio_stream::once(Ok(m))));
324
325
0
        let compression_override = compression_override_from_response(&response);
326
327
0
        self.map_response(
328
0
            response,
329
0
            accept_encoding,
330
0
            compression_override,
331
0
            self.max_encoding_message_size,
332
        )
333
0
    }
334
335
    /// Handle a bi-directional streaming gRPC request.
336
0
    pub async fn streaming<S, B>(
337
0
        &mut self,
338
0
        mut service: S,
339
0
        req: http::Request<B>,
340
0
    ) -> http::Response<Body>
341
0
    where
342
0
        S: StreamingService<T::Decode, Response = T::Encode> + Send,
343
0
        S::ResponseStream: Send + 'static,
344
0
        B: HttpBody + Send + 'static,
345
0
        B::Error: Into<crate::BoxError> + Send,
346
0
    {
347
0
        let accept_encoding = CompressionEncoding::from_accept_encoding_header(
348
0
            req.headers(),
349
0
            self.send_compression_encodings,
350
        );
351
352
0
        let request = t!(self.map_request_streaming(req));
353
354
0
        let response = service.call(request).await;
355
356
0
        self.map_response(
357
0
            response,
358
0
            accept_encoding,
359
0
            SingleMessageCompressionOverride::default(),
360
0
            self.max_encoding_message_size,
361
        )
362
0
    }
363
364
0
    async fn map_request_unary<B>(
365
0
        &mut self,
366
0
        request: http::Request<B>,
367
0
    ) -> Result<Request<T::Decode>, Status>
368
0
    where
369
0
        B: HttpBody + Send + 'static,
370
0
        B::Error: Into<crate::BoxError> + Send,
371
0
    {
372
0
        let request_compression_encoding = self.request_encoding_if_supported(&request)?;
373
374
0
        let (parts, body) = request.into_parts();
375
376
0
        let mut stream = pin!(Streaming::new_request(
377
0
            self.codec.decoder(),
378
0
            body,
379
0
            request_compression_encoding,
380
0
            self.max_decoding_message_size,
381
        ));
382
383
0
        let message = stream
384
0
            .try_next()
385
0
            .await?
386
0
            .ok_or_else(|| Status::internal("Missing request message."))?;
387
388
0
        let mut req = Request::from_http_parts(parts, message);
389
390
0
        if let Some(trailers) = stream.trailers().await? {
391
0
            req.metadata_mut().merge(trailers);
392
0
        }
393
394
0
        Ok(req)
395
0
    }
396
397
0
    fn map_request_streaming<B>(
398
0
        &mut self,
399
0
        request: http::Request<B>,
400
0
    ) -> Result<Request<Streaming<T::Decode>>, Status>
401
0
    where
402
0
        B: HttpBody + Send + 'static,
403
0
        B::Error: Into<crate::BoxError> + Send,
404
    {
405
0
        let encoding = self.request_encoding_if_supported(&request)?;
406
407
0
        let request = request.map(|body| {
408
0
            Streaming::new_request(
409
0
                self.codec.decoder(),
410
0
                body,
411
0
                encoding,
412
0
                self.max_decoding_message_size,
413
            )
414
0
        });
415
416
0
        Ok(Request::from_http(request))
417
0
    }
418
419
0
    fn map_response<B>(
420
0
        &mut self,
421
0
        response: Result<crate::Response<B>, Status>,
422
0
        accept_encoding: Option<CompressionEncoding>,
423
0
        compression_override: SingleMessageCompressionOverride,
424
0
        max_message_size: Option<usize>,
425
0
    ) -> http::Response<Body>
426
0
    where
427
0
        B: Stream<Item = Result<T::Encode, Status>> + Send + 'static,
428
    {
429
0
        let response = t!(response);
430
431
0
        let (mut parts, body) = response.into_http().into_parts();
432
433
        // Set the content type
434
0
        parts
435
0
            .headers
436
0
            .insert(http::header::CONTENT_TYPE, GRPC_CONTENT_TYPE);
437
438
        #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
439
        if let Some(encoding) = accept_encoding {
440
            // Set the content encoding
441
            parts.headers.insert(
442
                crate::codec::compression::ENCODING_HEADER,
443
                encoding.into_header_value(),
444
            );
445
        }
446
447
0
        let body = EncodeBody::new_server(
448
0
            self.codec.encoder(),
449
0
            body,
450
0
            accept_encoding,
451
0
            compression_override,
452
0
            max_message_size,
453
        );
454
455
0
        http::Response::from_parts(parts, Body::new(body))
456
0
    }
457
458
0
    fn request_encoding_if_supported<B>(
459
0
        &self,
460
0
        request: &http::Request<B>,
461
0
    ) -> Result<Option<CompressionEncoding>, Status> {
462
0
        CompressionEncoding::from_encoding_header(
463
0
            request.headers(),
464
0
            self.accept_compression_encodings,
465
        )
466
0
    }
467
}
468
469
impl<T: fmt::Debug> fmt::Debug for Grpc<T> {
470
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471
0
        f.debug_struct("Grpc")
472
0
            .field("codec", &self.codec)
473
0
            .field(
474
0
                "accept_compression_encodings",
475
0
                &self.accept_compression_encodings,
476
0
            )
477
0
            .field(
478
0
                "send_compression_encodings",
479
0
                &self.send_compression_encodings,
480
0
            )
481
0
            .finish()
482
0
    }
483
}
484
485
0
fn compression_override_from_response<B, E>(
486
0
    res: &Result<crate::Response<B>, E>,
487
0
) -> SingleMessageCompressionOverride {
488
0
    res.as_ref()
489
0
        .ok()
490
0
        .and_then(|response| {
491
0
            response
492
0
                .extensions()
493
0
                .get::<SingleMessageCompressionOverride>()
494
0
                .copied()
495
0
        })
496
0
        .unwrap_or_default()
497
0
}