Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.14.2/src/codec/encode.rs
Line
Count
Source
1
use super::compression::{
2
    compress, CompressionEncoding, CompressionSettings, SingleMessageCompressionOverride,
3
};
4
use super::{BufferSettings, EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
5
use crate::Status;
6
use bytes::{BufMut, Bytes, BytesMut};
7
use http::HeaderMap;
8
use http_body::{Body, Frame};
9
use pin_project::pin_project;
10
use std::{
11
    pin::Pin,
12
    task::{ready, Context, Poll},
13
};
14
use tokio_stream::{adapters::Fuse, Stream, StreamExt};
15
16
/// Combinator for efficient encoding of messages into reasonably sized buffers.
17
/// EncodedBytes encodes ready messages from its delegate stream into a BytesMut,
18
/// splitting off and yielding a buffer when either:
19
///  * The delegate stream polls as not ready, or
20
///  * The encoded buffer surpasses YIELD_THRESHOLD.
21
#[pin_project(project = EncodedBytesProj)]
22
#[derive(Debug)]
23
struct EncodedBytes<T, U> {
24
    #[pin]
25
    source: Fuse<U>,
26
    encoder: T,
27
    compression_encoding: Option<CompressionEncoding>,
28
    max_message_size: Option<usize>,
29
    buf: BytesMut,
30
    uncompression_buf: BytesMut,
31
    error: Option<Status>,
32
}
33
34
impl<T: Encoder, U: Stream> EncodedBytes<T, U> {
35
0
    fn new(
36
0
        encoder: T,
37
0
        source: U,
38
0
        compression_encoding: Option<CompressionEncoding>,
39
0
        compression_override: SingleMessageCompressionOverride,
40
0
        max_message_size: Option<usize>,
41
0
    ) -> Self {
42
0
        let buffer_settings = encoder.buffer_settings();
43
0
        let buf = BytesMut::with_capacity(buffer_settings.buffer_size);
44
45
0
        let compression_encoding =
46
0
            if compression_override == SingleMessageCompressionOverride::Disable {
47
0
                None
48
            } else {
49
0
                compression_encoding
50
            };
51
52
0
        let uncompression_buf = if compression_encoding.is_some() {
53
0
            BytesMut::with_capacity(buffer_settings.buffer_size)
54
        } else {
55
0
            BytesMut::new()
56
        };
57
58
0
        Self {
59
0
            source: source.fuse(),
60
0
            encoder,
61
0
            compression_encoding,
62
0
            max_message_size,
63
0
            buf,
64
0
            uncompression_buf,
65
0
            error: None,
66
0
        }
67
0
    }
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>>>::new
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>>>::new
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<_, _>>::new
68
}
69
70
impl<T, U> Stream for EncodedBytes<T, U>
71
where
72
    T: Encoder<Error = Status>,
73
    U: Stream<Item = Result<T::Item, Status>>,
74
{
75
    type Item = Result<Bytes, Status>;
76
77
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78
        let EncodedBytesProj {
79
0
            mut source,
80
0
            encoder,
81
0
            compression_encoding,
82
0
            max_message_size,
83
0
            buf,
84
0
            uncompression_buf,
85
0
            error,
86
0
        } = self.project();
87
0
        let buffer_settings = encoder.buffer_settings();
88
89
0
        if let Some(status) = error.take() {
90
0
            return Poll::Ready(Some(Err(status)));
91
0
        }
92
93
        loop {
94
0
            match source.as_mut().poll_next(cx) {
95
0
                Poll::Pending if buf.is_empty() => {
96
0
                    return Poll::Pending;
97
                }
98
0
                Poll::Ready(None) if buf.is_empty() => {
99
0
                    return Poll::Ready(None);
100
                }
101
                Poll::Pending | Poll::Ready(None) => {
102
0
                    return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
103
                }
104
0
                Poll::Ready(Some(Ok(item))) => {
105
0
                    if let Err(status) = encode_item(
106
0
                        encoder,
107
0
                        buf,
108
0
                        uncompression_buf,
109
0
                        *compression_encoding,
110
0
                        *max_message_size,
111
0
                        buffer_settings,
112
0
                        item,
113
0
                    ) {
114
0
                        return Poll::Ready(Some(Err(status)));
115
0
                    }
116
117
0
                    if buf.len() >= buffer_settings.yield_threshold {
118
0
                        return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
119
0
                    }
120
                }
121
0
                Poll::Ready(Some(Err(status))) => {
122
0
                    if buf.is_empty() {
123
0
                        return Poll::Ready(Some(Err(status)));
124
0
                    }
125
0
                    *error = Some(status);
126
0
                    return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
127
                }
128
            }
129
        }
130
0
    }
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<_, _> as futures_core::stream::Stream>::poll_next
131
}
132
133
0
fn encode_item<T>(
134
0
    encoder: &mut T,
135
0
    buf: &mut BytesMut,
136
0
    uncompression_buf: &mut BytesMut,
137
0
    compression_encoding: Option<CompressionEncoding>,
138
0
    max_message_size: Option<usize>,
139
0
    buffer_settings: BufferSettings,
140
0
    item: T::Item,
141
0
) -> Result<(), Status>
142
0
where
143
0
    T: Encoder<Error = Status>,
144
{
145
0
    let offset = buf.len();
146
147
0
    buf.reserve(HEADER_SIZE);
148
0
    unsafe {
149
0
        buf.advance_mut(HEADER_SIZE);
150
0
    }
151
152
0
    if let Some(encoding) = compression_encoding {
153
0
        uncompression_buf.clear();
154
155
0
        encoder
156
0
            .encode(item, &mut EncodeBuf::new(uncompression_buf))
157
0
            .map_err(|err| Status::internal(format!("Error encoding: {err}")))?;
158
159
0
        let uncompressed_len = uncompression_buf.len();
160
161
0
        compress(
162
0
            CompressionSettings {
163
0
                encoding,
164
0
                buffer_growth_interval: buffer_settings.buffer_size,
165
0
            },
166
0
            uncompression_buf,
167
0
            buf,
168
0
            uncompressed_len,
169
        )
170
0
        .map_err(|err| Status::internal(format!("Error compressing: {err}")))?;
171
    } else {
172
0
        encoder
173
0
            .encode(item, &mut EncodeBuf::new(buf))
174
0
            .map_err(|err| Status::internal(format!("Error encoding: {err}")))?;
Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>>::{closure#2}
Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>>::{closure#2}
Unexecuted instantiation: tonic::codec::encode::encode_item::<_>::{closure#2}
175
    }
176
177
    // now that we know length, we can write the header
178
0
    finish_encoding(compression_encoding, max_message_size, &mut buf[offset..])
179
0
}
Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>>
Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>>
Unexecuted instantiation: tonic::codec::encode::encode_item::<_>
180
181
0
fn finish_encoding(
182
0
    compression_encoding: Option<CompressionEncoding>,
183
0
    max_message_size: Option<usize>,
184
0
    buf: &mut [u8],
185
0
) -> Result<(), Status> {
186
0
    let len = buf.len() - HEADER_SIZE;
187
0
    let limit = max_message_size.unwrap_or(DEFAULT_MAX_SEND_MESSAGE_SIZE);
188
0
    if len > limit {
189
0
        return Err(Status::out_of_range(format!(
190
0
            "Error, encoded message length too large: found {len} bytes, the limit is: {limit} bytes"
191
0
        )));
192
0
    }
193
194
0
    if len > u32::MAX as usize {
195
0
        return Err(Status::resource_exhausted(format!(
196
0
            "Cannot return body with more than 4GB of data but got {len} bytes"
197
0
        )));
198
0
    }
199
0
    {
200
0
        let mut buf = &mut buf[..HEADER_SIZE];
201
0
        buf.put_u8(compression_encoding.is_some() as u8);
202
0
        buf.put_u32(len as u32);
203
0
    }
204
205
0
    Ok(())
206
0
}
207
208
#[derive(Debug)]
209
enum Role {
210
    Client,
211
    Server,
212
}
213
214
/// A specialized implementation of [Body] for encoding [Result<Bytes, Status>].
215
#[pin_project]
216
#[derive(Debug)]
217
pub struct EncodeBody<T, U> {
218
    #[pin]
219
    inner: EncodedBytes<T, U>,
220
    state: EncodeState,
221
}
222
223
#[derive(Debug)]
224
struct EncodeState {
225
    error: Option<Status>,
226
    role: Role,
227
    is_end_stream: bool,
228
}
229
230
impl<T: Encoder, U: Stream> EncodeBody<T, U> {
231
    /// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for
232
    /// turning the messages into http frames for sending over the network.
233
0
    pub fn new_client(
234
0
        encoder: T,
235
0
        source: U,
236
0
        compression_encoding: Option<CompressionEncoding>,
237
0
        max_message_size: Option<usize>,
238
0
    ) -> Self {
239
0
        Self {
240
0
            inner: EncodedBytes::new(
241
0
                encoder,
242
0
                source,
243
0
                compression_encoding,
244
0
                SingleMessageCompressionOverride::default(),
245
0
                max_message_size,
246
0
            ),
247
0
            state: EncodeState {
248
0
                error: None,
249
0
                role: Role::Client,
250
0
                is_end_stream: false,
251
0
            },
252
0
        }
253
0
    }
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>>>::new_client
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>>>::new_client
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _>>::new_client
254
255
    /// Turns a stream of grpc results (message or error status) into [EncodeBody] which is used by grpc
256
    /// servers for turning the messages into http frames for sending over the network.
257
0
    pub fn new_server(
258
0
        encoder: T,
259
0
        source: U,
260
0
        compression_encoding: Option<CompressionEncoding>,
261
0
        compression_override: SingleMessageCompressionOverride,
262
0
        max_message_size: Option<usize>,
263
0
    ) -> Self {
264
0
        Self {
265
0
            inner: EncodedBytes::new(
266
0
                encoder,
267
0
                source,
268
0
                compression_encoding,
269
0
                compression_override,
270
0
                max_message_size,
271
0
            ),
272
0
            state: EncodeState {
273
0
                error: None,
274
0
                role: Role::Server,
275
0
                is_end_stream: false,
276
0
            },
277
0
        }
278
0
    }
279
}
280
281
impl EncodeState {
282
0
    fn trailers(&mut self) -> Option<Result<HeaderMap, Status>> {
283
0
        match self.role {
284
0
            Role::Client => None,
285
            Role::Server => {
286
0
                if self.is_end_stream {
287
0
                    return None;
288
0
                }
289
290
0
                self.is_end_stream = true;
291
0
                let status = if let Some(status) = self.error.take() {
292
0
                    status
293
                } else {
294
0
                    Status::ok("")
295
                };
296
0
                Some(status.to_header_map())
297
            }
298
        }
299
0
    }
300
}
301
302
impl<T, U> Body for EncodeBody<T, U>
303
where
304
    T: Encoder<Error = Status>,
305
    U: Stream<Item = Result<T::Item, Status>>,
306
{
307
    type Data = Bytes;
308
    type Error = Status;
309
310
0
    fn is_end_stream(&self) -> bool {
311
0
        self.state.is_end_stream
312
0
    }
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::is_end_stream
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::is_end_stream
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::is_end_stream
313
314
0
    fn poll_frame(
315
0
        self: Pin<&mut Self>,
316
0
        cx: &mut Context<'_>,
317
0
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
318
0
        let self_proj = self.project();
319
0
        match ready!(self_proj.inner.poll_next(cx)) {
320
0
            Some(Ok(d)) => Some(Ok(Frame::data(d))).into(),
321
0
            Some(Err(status)) => match self_proj.state.role {
322
0
                Role::Client => Some(Err(status)).into(),
323
                Role::Server => {
324
0
                    self_proj.state.is_end_stream = true;
325
0
                    Some(Ok(Frame::trailers(status.to_header_map()?))).into()
326
                }
327
            },
328
0
            None => self_proj
329
0
                .state
330
0
                .trailers()
331
0
                .map(|t| t.map(Frame::trailers))
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame::{closure#0}
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame::{closure#0}
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::poll_frame::{closure#0}
332
0
                .into(),
333
        }
334
0
    }
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame
Unexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::poll_frame
335
}