Coverage Report

Created: 2026-04-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.14.5/src/codec/decode.rs
Line
Count
Source
1
use super::compression::{decompress, CompressionEncoding, CompressionSettings};
2
use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE};
3
use crate::{body::Body, metadata::MetadataMap, Code, Status};
4
use bytes::{Buf, BufMut, BytesMut};
5
use http::{HeaderMap, StatusCode};
6
use http_body::Body as HttpBody;
7
use http_body_util::BodyExt;
8
use std::{
9
    fmt, future,
10
    pin::Pin,
11
    task::ready,
12
    task::{Context, Poll},
13
};
14
use sync_wrapper::SyncWrapper;
15
use tokio_stream::Stream;
16
use tracing::{debug, trace};
17
18
/// Streaming requests and responses.
19
///
20
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
21
/// to fetch the message stream and trailing metadata
22
pub struct Streaming<T> {
23
    decoder: SyncWrapper<Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>>,
24
    inner: StreamingInner,
25
}
26
27
struct StreamingInner {
28
    body: SyncWrapper<Body>,
29
    state: State,
30
    direction: Direction,
31
    buf: BytesMut,
32
    trailers: Option<HeaderMap>,
33
    decompress_buf: BytesMut,
34
    encoding: Option<CompressionEncoding>,
35
    max_message_size: Option<usize>,
36
}
37
38
impl<T> Unpin for Streaming<T> {}
39
40
#[derive(Debug, Clone)]
41
enum State {
42
    ReadHeader,
43
    ReadBody {
44
        compression: Option<CompressionEncoding>,
45
        len: usize,
46
    },
47
    Error(Option<Status>),
48
}
49
50
#[derive(Debug, PartialEq, Eq)]
51
enum Direction {
52
    Request,
53
    Response(StatusCode),
54
    EmptyResponse,
55
}
56
57
impl<T> Streaming<T> {
58
    /// Create a new streaming response in the grpc response format for decoding a response [Body]
59
    /// into message of type T
60
0
    pub fn new_response<B, D>(
61
0
        decoder: D,
62
0
        body: B,
63
0
        status_code: StatusCode,
64
0
        encoding: Option<CompressionEncoding>,
65
0
        max_message_size: Option<usize>,
66
0
    ) -> Self
67
0
    where
68
0
        B: HttpBody + Send + 'static,
69
0
        B::Error: Into<crate::BoxError>,
70
0
        D: Decoder<Item = T, Error = Status> + Send + 'static,
71
    {
72
0
        Self::new(
73
0
            decoder,
74
0
            body,
75
0
            Direction::Response(status_code),
76
0
            encoding,
77
0
            max_message_size,
78
        )
79
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_response::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_response::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_response::<_, _>
80
81
    /// Create empty response. For creating responses that have no content (headers + trailers only)
82
0
    pub fn new_empty<B, D>(decoder: D, body: B) -> Self
83
0
    where
84
0
        B: HttpBody + Send + 'static,
85
0
        B::Error: Into<crate::BoxError>,
86
0
        D: Decoder<Item = T, Error = Status> + Send + 'static,
87
    {
88
0
        Self::new(decoder, body, Direction::EmptyResponse, None, None)
89
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_empty::<_, _>
90
91
    /// Create a new streaming request in the grpc response format for decoding a request [Body]
92
    /// into message of type T
93
0
    pub fn new_request<B, D>(
94
0
        decoder: D,
95
0
        body: B,
96
0
        encoding: Option<CompressionEncoding>,
97
0
        max_message_size: Option<usize>,
98
0
    ) -> Self
99
0
    where
100
0
        B: HttpBody + Send + 'static,
101
0
        B::Error: Into<crate::BoxError>,
102
0
        D: Decoder<Item = T, Error = Status> + Send + 'static,
103
    {
104
0
        Self::new(
105
0
            decoder,
106
0
            body,
107
0
            Direction::Request,
108
0
            encoding,
109
0
            max_message_size,
110
        )
111
0
    }
112
113
0
    fn new<B, D>(
114
0
        decoder: D,
115
0
        body: B,
116
0
        direction: Direction,
117
0
        encoding: Option<CompressionEncoding>,
118
0
        max_message_size: Option<usize>,
119
0
    ) -> Self
120
0
    where
121
0
        B: HttpBody + Send + 'static,
122
0
        B::Error: Into<crate::BoxError>,
123
0
        D: Decoder<Item = T, Error = Status> + Send + 'static,
124
    {
125
0
        let buffer_size = decoder.buffer_settings().buffer_size;
126
        Self {
127
0
            decoder: SyncWrapper::new(Box::new(decoder)),
128
            inner: StreamingInner {
129
0
                body: SyncWrapper::new(Body::new(
130
0
                    body.map_frame(|frame| {
131
0
                        frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0}::{closure#0}
132
0
                    })
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0}
133
0
                    .map_err(|err| Status::map_error(err.into())),
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#1}
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#1}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#1}
134
                )),
135
0
                state: State::ReadHeader,
136
0
                direction,
137
0
                buf: BytesMut::with_capacity(buffer_size),
138
0
                trailers: None,
139
0
                decompress_buf: BytesMut::new(),
140
0
                encoding,
141
0
                max_message_size,
142
            },
143
        }
144
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>
145
}
146
147
impl StreamingInner {
148
0
    fn decode_chunk(
149
0
        &mut self,
150
0
        buffer_settings: BufferSettings,
151
0
    ) -> Result<Option<DecodeBuf<'_>>, Status> {
152
0
        if let State::ReadHeader = self.state {
153
0
            if self.buf.remaining() < HEADER_SIZE {
154
0
                return Ok(None);
155
0
            }
156
157
0
            let compression_encoding = match self.buf.get_u8() {
158
0
                0 => None,
159
                1 => {
160
                    {
161
0
                        if self.encoding.is_some() {
162
0
                            self.encoding
163
                        } else {
164
                            // https://grpc.github.io/grpc/core/md_doc_compression.html
165
                            // An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding
166
                            // entry different from identity in its metadata MUST fail with INTERNAL status,
167
                            // its associated description indicating the invalid Compressed-Flag condition.
168
0
                            return Err(Status::internal( "protocol error: received message with compressed-flag but no grpc-encoding was specified"));
169
                        }
170
                    }
171
                }
172
0
                f => {
173
0
                    trace!("unexpected compression flag");
174
0
                    let message = if let Direction::Response(status) = self.direction {
175
0
                        format!(
176
0
                            "protocol error: received message with invalid compression flag: {f} (valid flags are 0 and 1) while receiving response with status: {status}"
177
                        )
178
                    } else {
179
0
                        format!("protocol error: received message with invalid compression flag: {f} (valid flags are 0 and 1), while sending request")
180
                    };
181
0
                    return Err(Status::internal(message));
182
                }
183
            };
184
185
0
            let len = self.buf.get_u32() as usize;
186
0
            let limit = self
187
0
                .max_message_size
188
0
                .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
189
0
            if len > limit {
190
0
                return Err(Status::out_of_range(
191
0
                    format!(
192
0
                        "Error, decoded message length too large: found {len} bytes, the limit is: {limit} bytes"
193
0
                    ),
194
0
                ));
195
0
            }
196
197
0
            self.buf.reserve(len);
198
199
0
            self.state = State::ReadBody {
200
0
                compression: compression_encoding,
201
0
                len,
202
0
            }
203
0
        }
204
205
0
        if let State::ReadBody { len, compression } = self.state {
206
            // if we haven't read enough of the message then return and keep
207
            // reading
208
0
            if self.buf.remaining() < len || self.buf.len() < len {
209
0
                return Ok(None);
210
0
            }
211
212
0
            let decode_buf = if let Some(encoding) = compression {
213
0
                self.decompress_buf.clear();
214
0
                let limit = self
215
0
                    .max_message_size
216
0
                    .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
217
0
                let limited_out_buf = (&mut self.decompress_buf).limit(limit);
218
219
0
                if let Err(err) = decompress(
220
0
                    CompressionSettings {
221
0
                        encoding,
222
0
                        buffer_growth_interval: buffer_settings.buffer_size,
223
0
                    },
224
0
                    &mut self.buf,
225
0
                    limited_out_buf,
226
0
                    len,
227
0
                ) {
228
0
                    if matches!(err.kind(), std::io::ErrorKind::WriteZero) {
229
0
                        return Err(Status::resource_exhausted(format!(
230
0
                            "Error decompressing: size limit, of {limit} bytes, exceeded while decompressing message"
231
0
                        )));
232
0
                    }
233
0
                    let message = if let Direction::Response(status) = self.direction {
234
0
                        format!(
235
0
                            "Error decompressing: {err}, while receiving response with status: {status}"
236
                        )
237
                    } else {
238
0
                        format!("Error decompressing: {err}, while sending request")
239
                    };
240
0
                    return Err(Status::internal(message));
241
0
                }
242
0
                let decompressed_len = self.decompress_buf.len();
243
0
                DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
244
            } else {
245
0
                DecodeBuf::new(&mut self.buf, len)
246
            };
247
248
0
            return Ok(Some(decode_buf));
249
0
        }
250
251
0
        Ok(None)
252
0
    }
253
254
    // Returns Some(()) if data was found or None if the loop in `poll_next` should break
255
0
    fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
256
0
        let frame = match ready!(Pin::new(self.body.get_mut()).poll_frame(cx)) {
257
0
            Some(Ok(frame)) => frame,
258
0
            Some(Err(status)) => {
259
0
                if self.direction == Direction::Request && status.code() == Code::Cancelled {
260
0
                    return Poll::Ready(Ok(None));
261
0
                }
262
263
0
                let _ = std::mem::replace(&mut self.state, State::Error(Some(status.clone())));
264
0
                debug!("decoder inner stream error: {:?}", status);
265
0
                return Poll::Ready(Err(status));
266
            }
267
            None => {
268
                // FIXME: improve buf usage.
269
0
                return Poll::Ready(if self.buf.has_remaining() {
270
0
                    trace!("unexpected EOF decoding stream, state: {:?}", self.state);
271
0
                    Err(Status::internal("Unexpected EOF decoding stream."))
272
                } else {
273
0
                    Ok(None)
274
                });
275
            }
276
        };
277
278
0
        Poll::Ready(if frame.is_data() {
279
0
            self.buf.put(frame.into_data().unwrap());
280
0
            Ok(Some(()))
281
0
        } else if frame.is_trailers() {
282
0
            if let Some(trailers) = &mut self.trailers {
283
0
                trailers.extend(frame.into_trailers().unwrap());
284
0
            } else {
285
0
                self.trailers = Some(frame.into_trailers().unwrap());
286
0
            }
287
288
0
            Ok(None)
289
        } else {
290
0
            panic!("unexpected frame: {frame:?}");
291
        })
292
0
    }
293
294
0
    fn response(&mut self) -> Result<(), Status> {
295
0
        if let Direction::Response(status) = self.direction {
296
0
            if let Err(Some(e)) = crate::status::infer_grpc_status(self.trailers.as_ref(), status) {
297
                // If the trailers contain a grpc-status, then we should return that as the error
298
                // and otherwise stop the stream (by taking the error state)
299
0
                self.trailers.take();
300
0
                return Err(e);
301
0
            }
302
0
        }
303
0
        Ok(())
304
0
    }
305
}
306
307
impl<T> Streaming<T> {
308
    /// Fetch the next message from this stream.
309
    ///
310
    /// # Return value
311
    ///
312
    /// - `Result::Err(val)` means a gRPC error was sent by the sender instead
313
    ///   of a valid response message. Refer to [`Status::code`] and
314
    ///   [`Status::message`] to examine possible error causes.
315
    ///
316
    /// - `Result::Ok(None)` means the stream was closed by the sender and no
317
    ///   more messages will be delivered. Further attempts to call
318
    ///   [`Streaming::message`] will result in the same return value.
319
    ///
320
    /// - `Result::Ok(Some(val))` means the sender streamed a valid response
321
    ///   message `val`.
322
    ///
323
    /// ```rust
324
    /// # use tonic::{Streaming, Status, codec::Decoder};
325
    /// # use std::fmt::Debug;
326
    /// # async fn next_message_ex<T, D>(mut request: Streaming<T>) -> Result<(), Status>
327
    /// # where T: Debug,
328
    /// # D: Decoder<Item = T, Error = Status> + Send  + 'static,
329
    /// # {
330
    /// if let Some(next_message) = request.message().await? {
331
    ///     println!("{:?}", next_message);
332
    /// }
333
    /// # Ok(())
334
    /// # }
335
    /// ```
336
0
    pub async fn message(&mut self) -> Result<Option<T>, Status> {
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message
337
0
        match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await {
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0}::{closure#0}
338
0
            Some(Ok(m)) => Ok(Some(m)),
339
0
            Some(Err(e)) => Err(e),
340
0
            None => Ok(None),
341
        }
342
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0}
343
344
    /// Fetch the trailing metadata.
345
    ///
346
    /// This will drain the stream of all its messages to receive the trailing
347
    /// metadata. If [`Streaming::message`] returns `None` then this function
348
    /// will not need to poll for trailers since the body was totally consumed.
349
    ///
350
    /// ```rust
351
    /// # use tonic::{Streaming, Status};
352
    /// # async fn trailers_ex<T>(mut request: Streaming<T>) -> Result<(), Status> {
353
    /// if let Some(metadata) = request.trailers().await? {
354
    ///     println!("{:?}", metadata);
355
    /// }
356
    /// # Ok(())
357
    /// # }
358
    /// ```
359
0
    pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers
360
        // Shortcut to see if we already pulled the trailers in the stream step
361
        // we need to do that so that the stream can error on trailing grpc-status
362
0
        if let Some(trailers) = self.inner.trailers.take() {
363
0
            return Ok(Some(MetadataMap::from_headers(trailers)));
364
0
        }
365
366
        // To fetch the trailers we must clear the body and drop it.
367
0
        while self.message().await?.is_some() {}
368
369
        // Since we call poll_trailers internally on poll_next we need to
370
        // check if it got cached again.
371
0
        if let Some(trailers) = self.inner.trailers.take() {
372
0
            return Ok(Some(MetadataMap::from_headers(trailers)));
373
0
        }
374
375
        // We've polled through all the frames, and still no trailers, return None
376
0
        Ok(None)
377
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers::{closure#0}
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers::{closure#0}
378
379
0
    fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
380
0
        match self
381
0
            .inner
382
0
            .decode_chunk(self.decoder.get_mut().buffer_settings())?
383
        {
384
0
            Some(mut decode_buf) => match self.decoder.get_mut().decode(&mut decode_buf)? {
385
0
                Some(msg) => {
386
0
                    self.inner.state = State::ReadHeader;
387
0
                    Ok(Some(msg))
388
                }
389
0
                None => Ok(None),
390
            },
391
0
            None => Ok(None),
392
        }
393
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::decode_chunk
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::decode_chunk
Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::decode_chunk
394
}
395
396
impl<T> Stream for Streaming<T> {
397
    type Item = Result<T, Status>;
398
399
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
400
        loop {
401
            // When the stream encounters an error yield that error once and then on subsequent
402
            // calls to poll_next return Poll::Ready(None) indicating that the stream has been
403
            // fully exhausted.
404
0
            if let State::Error(status) = &mut self.inner.state {
405
0
                return Poll::Ready(status.take().map(Err));
406
0
            }
407
408
0
            if let Some(item) = self.decode_chunk()? {
409
0
                return Poll::Ready(Some(Ok(item)));
410
0
            }
411
412
0
            if ready!(self.inner.poll_frame(cx))?.is_none() {
413
0
                match self.inner.response() {
414
0
                    Ok(()) => return Poll::Ready(None),
415
0
                    Err(err) => self.inner.state = State::Error(Some(err)),
416
                }
417
0
            }
418
        }
419
0
    }
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <tonic::codec::decode::Streaming<_> as futures_core::stream::Stream>::poll_next
420
}
421
422
impl<T> fmt::Debug for Streaming<T> {
423
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424
0
        f.debug_struct("Streaming").finish()
425
0
    }
426
}
427
428
#[cfg(test)]
429
static_assertions::assert_impl_all!(Streaming<()>: Send, Sync);