/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.14.2/src/codec/encode.rs
Line | Count | Source |
1 | | use super::compression::{ |
2 | | compress, CompressionEncoding, CompressionSettings, SingleMessageCompressionOverride, |
3 | | }; |
4 | | use super::{BufferSettings, EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE}; |
5 | | use crate::Status; |
6 | | use bytes::{BufMut, Bytes, BytesMut}; |
7 | | use http::HeaderMap; |
8 | | use http_body::{Body, Frame}; |
9 | | use pin_project::pin_project; |
10 | | use std::{ |
11 | | pin::Pin, |
12 | | task::{ready, Context, Poll}, |
13 | | }; |
14 | | use tokio_stream::{adapters::Fuse, Stream, StreamExt}; |
15 | | |
16 | | /// Combinator for efficient encoding of messages into reasonably sized buffers. |
17 | | /// EncodedBytes encodes ready messages from its delegate stream into a BytesMut, |
18 | | /// splitting off and yielding a buffer when either: |
19 | | /// * The delegate stream polls as not ready, or |
20 | | /// * The encoded buffer surpasses YIELD_THRESHOLD. |
21 | | #[pin_project(project = EncodedBytesProj)] |
22 | | #[derive(Debug)] |
23 | | struct EncodedBytes<T, U> { |
24 | | #[pin] |
25 | | source: Fuse<U>, |
26 | | encoder: T, |
27 | | compression_encoding: Option<CompressionEncoding>, |
28 | | max_message_size: Option<usize>, |
29 | | buf: BytesMut, |
30 | | uncompression_buf: BytesMut, |
31 | | error: Option<Status>, |
32 | | } |
33 | | |
34 | | impl<T: Encoder, U: Stream> EncodedBytes<T, U> { |
35 | 0 | fn new( |
36 | 0 | encoder: T, |
37 | 0 | source: U, |
38 | 0 | compression_encoding: Option<CompressionEncoding>, |
39 | 0 | compression_override: SingleMessageCompressionOverride, |
40 | 0 | max_message_size: Option<usize>, |
41 | 0 | ) -> Self { |
42 | 0 | let buffer_settings = encoder.buffer_settings(); |
43 | 0 | let buf = BytesMut::with_capacity(buffer_settings.buffer_size); |
44 | | |
45 | 0 | let compression_encoding = |
46 | 0 | if compression_override == SingleMessageCompressionOverride::Disable { |
47 | 0 | None |
48 | | } else { |
49 | 0 | compression_encoding |
50 | | }; |
51 | | |
52 | 0 | let uncompression_buf = if compression_encoding.is_some() { |
53 | 0 | BytesMut::with_capacity(buffer_settings.buffer_size) |
54 | | } else { |
55 | 0 | BytesMut::new() |
56 | | }; |
57 | | |
58 | 0 | Self { |
59 | 0 | source: source.fuse(), |
60 | 0 | encoder, |
61 | 0 | compression_encoding, |
62 | 0 | max_message_size, |
63 | 0 | buf, |
64 | 0 | uncompression_buf, |
65 | 0 | error: None, |
66 | 0 | } |
67 | 0 | } Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>>>::new Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>>>::newUnexecuted instantiation: <tonic::codec::encode::EncodedBytes<_, _>>::new |
68 | | } |
69 | | |
70 | | impl<T, U> Stream for EncodedBytes<T, U> |
71 | | where |
72 | | T: Encoder<Error = Status>, |
73 | | U: Stream<Item = Result<T::Item, Status>>, |
74 | | { |
75 | | type Item = Result<Bytes, Status>; |
76 | | |
77 | 0 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
78 | | let EncodedBytesProj { |
79 | 0 | mut source, |
80 | 0 | encoder, |
81 | 0 | compression_encoding, |
82 | 0 | max_message_size, |
83 | 0 | buf, |
84 | 0 | uncompression_buf, |
85 | 0 | error, |
86 | 0 | } = self.project(); |
87 | 0 | let buffer_settings = encoder.buffer_settings(); |
88 | | |
89 | 0 | if let Some(status) = error.take() { |
90 | 0 | return Poll::Ready(Some(Err(status))); |
91 | 0 | } |
92 | | |
93 | | loop { |
94 | 0 | match source.as_mut().poll_next(cx) { |
95 | 0 | Poll::Pending if buf.is_empty() => { |
96 | 0 | return Poll::Pending; |
97 | | } |
98 | 0 | Poll::Ready(None) if buf.is_empty() => { |
99 | 0 | return Poll::Ready(None); |
100 | | } |
101 | | Poll::Pending | Poll::Ready(None) => { |
102 | 0 | return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze()))); |
103 | | } |
104 | 0 | Poll::Ready(Some(Ok(item))) => { |
105 | 0 | if let Err(status) = encode_item( |
106 | 0 | encoder, |
107 | 0 | buf, |
108 | 0 | uncompression_buf, |
109 | 0 | *compression_encoding, |
110 | 0 | *max_message_size, |
111 | 0 | buffer_settings, |
112 | 0 | item, |
113 | 0 | ) { |
114 | 0 | return Poll::Ready(Some(Err(status))); |
115 | 0 | } |
116 | | |
117 | 0 | if buf.len() >= buffer_settings.yield_threshold { |
118 | 0 | return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze()))); |
119 | 0 | } |
120 | | } |
121 | 0 | Poll::Ready(Some(Err(status))) => { |
122 | 0 | if buf.is_empty() { |
123 | 0 | return Poll::Ready(Some(Err(status))); |
124 | 0 | } |
125 | 0 | *error = Some(status); |
126 | 0 | return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze()))); |
127 | | } |
128 | | } |
129 | | } |
130 | 0 | } Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as futures_core::stream::Stream>::poll_next Unexecuted instantiation: <tonic::codec::encode::EncodedBytes<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as futures_core::stream::Stream>::poll_nextUnexecuted instantiation: <tonic::codec::encode::EncodedBytes<_, _> as futures_core::stream::Stream>::poll_next |
131 | | } |
132 | | |
133 | 0 | fn encode_item<T>( |
134 | 0 | encoder: &mut T, |
135 | 0 | buf: &mut BytesMut, |
136 | 0 | uncompression_buf: &mut BytesMut, |
137 | 0 | compression_encoding: Option<CompressionEncoding>, |
138 | 0 | max_message_size: Option<usize>, |
139 | 0 | buffer_settings: BufferSettings, |
140 | 0 | item: T::Item, |
141 | 0 | ) -> Result<(), Status> |
142 | 0 | where |
143 | 0 | T: Encoder<Error = Status>, |
144 | | { |
145 | 0 | let offset = buf.len(); |
146 | | |
147 | 0 | buf.reserve(HEADER_SIZE); |
148 | 0 | unsafe { |
149 | 0 | buf.advance_mut(HEADER_SIZE); |
150 | 0 | } |
151 | | |
152 | 0 | if let Some(encoding) = compression_encoding { |
153 | 0 | uncompression_buf.clear(); |
154 | | |
155 | 0 | encoder |
156 | 0 | .encode(item, &mut EncodeBuf::new(uncompression_buf)) |
157 | 0 | .map_err(|err| Status::internal(format!("Error encoding: {err}")))?; |
158 | | |
159 | 0 | let uncompressed_len = uncompression_buf.len(); |
160 | | |
161 | 0 | compress( |
162 | 0 | CompressionSettings { |
163 | 0 | encoding, |
164 | 0 | buffer_growth_interval: buffer_settings.buffer_size, |
165 | 0 | }, |
166 | 0 | uncompression_buf, |
167 | 0 | buf, |
168 | 0 | uncompressed_len, |
169 | | ) |
170 | 0 | .map_err(|err| Status::internal(format!("Error compressing: {err}")))?; |
171 | | } else { |
172 | 0 | encoder |
173 | 0 | .encode(item, &mut EncodeBuf::new(buf)) |
174 | 0 | .map_err(|err| Status::internal(format!("Error encoding: {err}")))?;Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>>::{closure#2}Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>>::{closure#2}Unexecuted instantiation: tonic::codec::encode::encode_item::<_>::{closure#2} |
175 | | } |
176 | | |
177 | | // now that we know length, we can write the header |
178 | 0 | finish_encoding(compression_encoding, max_message_size, &mut buf[offset..]) |
179 | 0 | } Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>> Unexecuted instantiation: tonic::codec::encode::encode_item::<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>> Unexecuted instantiation: tonic::codec::encode::encode_item::<_> |
180 | | |
181 | 0 | fn finish_encoding( |
182 | 0 | compression_encoding: Option<CompressionEncoding>, |
183 | 0 | max_message_size: Option<usize>, |
184 | 0 | buf: &mut [u8], |
185 | 0 | ) -> Result<(), Status> { |
186 | 0 | let len = buf.len() - HEADER_SIZE; |
187 | 0 | let limit = max_message_size.unwrap_or(DEFAULT_MAX_SEND_MESSAGE_SIZE); |
188 | 0 | if len > limit { |
189 | 0 | return Err(Status::out_of_range(format!( |
190 | 0 | "Error, encoded message length too large: found {len} bytes, the limit is: {limit} bytes" |
191 | 0 | ))); |
192 | 0 | } |
193 | | |
194 | 0 | if len > u32::MAX as usize { |
195 | 0 | return Err(Status::resource_exhausted(format!( |
196 | 0 | "Cannot return body with more than 4GB of data but got {len} bytes" |
197 | 0 | ))); |
198 | 0 | } |
199 | 0 | { |
200 | 0 | let mut buf = &mut buf[..HEADER_SIZE]; |
201 | 0 | buf.put_u8(compression_encoding.is_some() as u8); |
202 | 0 | buf.put_u32(len as u32); |
203 | 0 | } |
204 | | |
205 | 0 | Ok(()) |
206 | 0 | } |
207 | | |
208 | | #[derive(Debug)] |
209 | | enum Role { |
210 | | Client, |
211 | | Server, |
212 | | } |
213 | | |
214 | | /// A specialized implementation of [Body] for encoding [Result<Bytes, Status>]. |
215 | | #[pin_project] |
216 | | #[derive(Debug)] |
217 | | pub struct EncodeBody<T, U> { |
218 | | #[pin] |
219 | | inner: EncodedBytes<T, U>, |
220 | | state: EncodeState, |
221 | | } |
222 | | |
223 | | #[derive(Debug)] |
224 | | struct EncodeState { |
225 | | error: Option<Status>, |
226 | | role: Role, |
227 | | is_end_stream: bool, |
228 | | } |
229 | | |
230 | | impl<T: Encoder, U: Stream> EncodeBody<T, U> { |
231 | | /// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for |
232 | | /// turning the messages into http frames for sending over the network. |
233 | 0 | pub fn new_client( |
234 | 0 | encoder: T, |
235 | 0 | source: U, |
236 | 0 | compression_encoding: Option<CompressionEncoding>, |
237 | 0 | max_message_size: Option<usize>, |
238 | 0 | ) -> Self { |
239 | 0 | Self { |
240 | 0 | inner: EncodedBytes::new( |
241 | 0 | encoder, |
242 | 0 | source, |
243 | 0 | compression_encoding, |
244 | 0 | SingleMessageCompressionOverride::default(), |
245 | 0 | max_message_size, |
246 | 0 | ), |
247 | 0 | state: EncodeState { |
248 | 0 | error: None, |
249 | 0 | role: Role::Client, |
250 | 0 | is_end_stream: false, |
251 | 0 | }, |
252 | 0 | } |
253 | 0 | } Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>>>::new_client Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>>>::new_clientUnexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _>>::new_client |
254 | | |
255 | | /// Turns a stream of grpc results (message or error status) into [EncodeBody] which is used by grpc |
256 | | /// servers for turning the messages into http frames for sending over the network. |
257 | 0 | pub fn new_server( |
258 | 0 | encoder: T, |
259 | 0 | source: U, |
260 | 0 | compression_encoding: Option<CompressionEncoding>, |
261 | 0 | compression_override: SingleMessageCompressionOverride, |
262 | 0 | max_message_size: Option<usize>, |
263 | 0 | ) -> Self { |
264 | 0 | Self { |
265 | 0 | inner: EncodedBytes::new( |
266 | 0 | encoder, |
267 | 0 | source, |
268 | 0 | compression_encoding, |
269 | 0 | compression_override, |
270 | 0 | max_message_size, |
271 | 0 | ), |
272 | 0 | state: EncodeState { |
273 | 0 | error: None, |
274 | 0 | role: Role::Server, |
275 | 0 | is_end_stream: false, |
276 | 0 | }, |
277 | 0 | } |
278 | 0 | } |
279 | | } |
280 | | |
281 | | impl EncodeState { |
282 | 0 | fn trailers(&mut self) -> Option<Result<HeaderMap, Status>> { |
283 | 0 | match self.role { |
284 | 0 | Role::Client => None, |
285 | | Role::Server => { |
286 | 0 | if self.is_end_stream { |
287 | 0 | return None; |
288 | 0 | } |
289 | | |
290 | 0 | self.is_end_stream = true; |
291 | 0 | let status = if let Some(status) = self.error.take() { |
292 | 0 | status |
293 | | } else { |
294 | 0 | Status::ok("") |
295 | | }; |
296 | 0 | Some(status.to_header_map()) |
297 | | } |
298 | | } |
299 | 0 | } |
300 | | } |
301 | | |
302 | | impl<T, U> Body for EncodeBody<T, U> |
303 | | where |
304 | | T: Encoder<Error = Status>, |
305 | | U: Stream<Item = Result<T::Item, Status>>, |
306 | | { |
307 | | type Data = Bytes; |
308 | | type Error = Status; |
309 | | |
310 | 0 | fn is_end_stream(&self) -> bool { |
311 | 0 | self.state.is_end_stream |
312 | 0 | } Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::is_end_stream Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::is_end_streamUnexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::is_end_stream |
313 | | |
314 | 0 | fn poll_frame( |
315 | 0 | self: Pin<&mut Self>, |
316 | 0 | cx: &mut Context<'_>, |
317 | 0 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
318 | 0 | let self_proj = self.project(); |
319 | 0 | match ready!(self_proj.inner.poll_next(cx)) { |
320 | 0 | Some(Ok(d)) => Some(Ok(Frame::data(d))).into(), |
321 | 0 | Some(Err(status)) => match self_proj.state.role { |
322 | 0 | Role::Client => Some(Err(status)).into(), |
323 | | Role::Server => { |
324 | 0 | self_proj.state.is_end_stream = true; |
325 | 0 | Some(Ok(Frame::trailers(status.to_header_map()?))).into() |
326 | | } |
327 | | }, |
328 | 0 | None => self_proj |
329 | 0 | .state |
330 | 0 | .trailers() |
331 | 0 | .map(|t| t.map(Frame::trailers)) Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame::{closure#0}Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame::{closure#0}Unexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::poll_frame::{closure#0} |
332 | 0 | .into(), |
333 | | } |
334 | 0 | } Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, tokio_stream::stream_ext::map::Map<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, core::result::Result<ztunnel::xds::types::istio::ca::IstioCertificateRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frame Unexecuted instantiation: <tonic::codec::encode::EncodeBody<tonic_prost::codec::ProstEncoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest>, tokio_stream::stream_ext::map::Map<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, core::result::Result<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, tonic::status::Status>::Ok>> as http_body::Body>::poll_frameUnexecuted instantiation: <tonic::codec::encode::EncodeBody<_, _> as http_body::Body>::poll_frame |
335 | | } |