/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.14.5/src/codec/decode.rs
Line | Count | Source |
1 | | use super::compression::{decompress, CompressionEncoding, CompressionSettings}; |
2 | | use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE}; |
3 | | use crate::{body::Body, metadata::MetadataMap, Code, Status}; |
4 | | use bytes::{Buf, BufMut, BytesMut}; |
5 | | use http::{HeaderMap, StatusCode}; |
6 | | use http_body::Body as HttpBody; |
7 | | use http_body_util::BodyExt; |
8 | | use std::{ |
9 | | fmt, future, |
10 | | pin::Pin, |
11 | | task::ready, |
12 | | task::{Context, Poll}, |
13 | | }; |
14 | | use sync_wrapper::SyncWrapper; |
15 | | use tokio_stream::Stream; |
16 | | use tracing::{debug, trace}; |
17 | | |
18 | | /// Streaming requests and responses. |
19 | | /// |
20 | | /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface |
21 | | /// to fetch the message stream and trailing metadata |
22 | | pub struct Streaming<T> { |
23 | | decoder: SyncWrapper<Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>>, |
24 | | inner: StreamingInner, |
25 | | } |
26 | | |
27 | | struct StreamingInner { |
28 | | body: SyncWrapper<Body>, |
29 | | state: State, |
30 | | direction: Direction, |
31 | | buf: BytesMut, |
32 | | trailers: Option<HeaderMap>, |
33 | | decompress_buf: BytesMut, |
34 | | encoding: Option<CompressionEncoding>, |
35 | | max_message_size: Option<usize>, |
36 | | } |
37 | | |
38 | | impl<T> Unpin for Streaming<T> {} |
39 | | |
40 | | #[derive(Debug, Clone)] |
41 | | enum State { |
42 | | ReadHeader, |
43 | | ReadBody { |
44 | | compression: Option<CompressionEncoding>, |
45 | | len: usize, |
46 | | }, |
47 | | Error(Option<Status>), |
48 | | } |
49 | | |
50 | | #[derive(Debug, PartialEq, Eq)] |
51 | | enum Direction { |
52 | | Request, |
53 | | Response(StatusCode), |
54 | | EmptyResponse, |
55 | | } |
56 | | |
57 | | impl<T> Streaming<T> { |
58 | | /// Create a new streaming response in the grpc response format for decoding a response [Body] |
59 | | /// into message of type T |
60 | 0 | pub fn new_response<B, D>( |
61 | 0 | decoder: D, |
62 | 0 | body: B, |
63 | 0 | status_code: StatusCode, |
64 | 0 | encoding: Option<CompressionEncoding>, |
65 | 0 | max_message_size: Option<usize>, |
66 | 0 | ) -> Self |
67 | 0 | where |
68 | 0 | B: HttpBody + Send + 'static, |
69 | 0 | B::Error: Into<crate::BoxError>, |
70 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
71 | | { |
72 | 0 | Self::new( |
73 | 0 | decoder, |
74 | 0 | body, |
75 | 0 | Direction::Response(status_code), |
76 | 0 | encoding, |
77 | 0 | max_message_size, |
78 | | ) |
79 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_response::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_response::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_response::<_, _> |
80 | | |
81 | | /// Create empty response. For creating responses that have no content (headers + trailers only) |
82 | 0 | pub fn new_empty<B, D>(decoder: D, body: B) -> Self |
83 | 0 | where |
84 | 0 | B: HttpBody + Send + 'static, |
85 | 0 | B::Error: Into<crate::BoxError>, |
86 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
87 | | { |
88 | 0 | Self::new(decoder, body, Direction::EmptyResponse, None, None) |
89 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_empty::<_, _> |
90 | | |
91 | | /// Create a new streaming request in the grpc response format for decoding a request [Body] |
92 | | /// into message of type T |
93 | 0 | pub fn new_request<B, D>( |
94 | 0 | decoder: D, |
95 | 0 | body: B, |
96 | 0 | encoding: Option<CompressionEncoding>, |
97 | 0 | max_message_size: Option<usize>, |
98 | 0 | ) -> Self |
99 | 0 | where |
100 | 0 | B: HttpBody + Send + 'static, |
101 | 0 | B::Error: Into<crate::BoxError>, |
102 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
103 | | { |
104 | 0 | Self::new( |
105 | 0 | decoder, |
106 | 0 | body, |
107 | 0 | Direction::Request, |
108 | 0 | encoding, |
109 | 0 | max_message_size, |
110 | | ) |
111 | 0 | } |
112 | | |
113 | 0 | fn new<B, D>( |
114 | 0 | decoder: D, |
115 | 0 | body: B, |
116 | 0 | direction: Direction, |
117 | 0 | encoding: Option<CompressionEncoding>, |
118 | 0 | max_message_size: Option<usize>, |
119 | 0 | ) -> Self |
120 | 0 | where |
121 | 0 | B: HttpBody + Send + 'static, |
122 | 0 | B::Error: Into<crate::BoxError>, |
123 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
124 | | { |
125 | 0 | let buffer_size = decoder.buffer_settings().buffer_size; |
126 | | Self { |
127 | 0 | decoder: SyncWrapper::new(Box::new(decoder)), |
128 | | inner: StreamingInner { |
129 | 0 | body: SyncWrapper::new(Body::new( |
130 | 0 | body.map_frame(|frame| { |
131 | 0 | frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0}::{closure#0} |
132 | 0 | }) Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0} |
133 | 0 | .map_err(|err| Status::map_error(err.into())), Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#1}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#1}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#1} |
134 | | )), |
135 | 0 | state: State::ReadHeader, |
136 | 0 | direction, |
137 | 0 | buf: BytesMut::with_capacity(buffer_size), |
138 | 0 | trailers: None, |
139 | 0 | decompress_buf: BytesMut::new(), |
140 | 0 | encoding, |
141 | 0 | max_message_size, |
142 | | }, |
143 | | } |
144 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic_prost::codec::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _> |
145 | | } |
146 | | |
147 | | impl StreamingInner { |
148 | 0 | fn decode_chunk( |
149 | 0 | &mut self, |
150 | 0 | buffer_settings: BufferSettings, |
151 | 0 | ) -> Result<Option<DecodeBuf<'_>>, Status> { |
152 | 0 | if let State::ReadHeader = self.state { |
153 | 0 | if self.buf.remaining() < HEADER_SIZE { |
154 | 0 | return Ok(None); |
155 | 0 | } |
156 | | |
157 | 0 | let compression_encoding = match self.buf.get_u8() { |
158 | 0 | 0 => None, |
159 | | 1 => { |
160 | | { |
161 | 0 | if self.encoding.is_some() { |
162 | 0 | self.encoding |
163 | | } else { |
164 | | // https://grpc.github.io/grpc/core/md_doc_compression.html |
165 | | // An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding |
166 | | // entry different from identity in its metadata MUST fail with INTERNAL status, |
167 | | // its associated description indicating the invalid Compressed-Flag condition. |
168 | 0 | return Err(Status::internal( "protocol error: received message with compressed-flag but no grpc-encoding was specified")); |
169 | | } |
170 | | } |
171 | | } |
172 | 0 | f => { |
173 | 0 | trace!("unexpected compression flag"); |
174 | 0 | let message = if let Direction::Response(status) = self.direction { |
175 | 0 | format!( |
176 | 0 | "protocol error: received message with invalid compression flag: {f} (valid flags are 0 and 1) while receiving response with status: {status}" |
177 | | ) |
178 | | } else { |
179 | 0 | format!("protocol error: received message with invalid compression flag: {f} (valid flags are 0 and 1), while sending request") |
180 | | }; |
181 | 0 | return Err(Status::internal(message)); |
182 | | } |
183 | | }; |
184 | | |
185 | 0 | let len = self.buf.get_u32() as usize; |
186 | 0 | let limit = self |
187 | 0 | .max_message_size |
188 | 0 | .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); |
189 | 0 | if len > limit { |
190 | 0 | return Err(Status::out_of_range( |
191 | 0 | format!( |
192 | 0 | "Error, decoded message length too large: found {len} bytes, the limit is: {limit} bytes" |
193 | 0 | ), |
194 | 0 | )); |
195 | 0 | } |
196 | | |
197 | 0 | self.buf.reserve(len); |
198 | | |
199 | 0 | self.state = State::ReadBody { |
200 | 0 | compression: compression_encoding, |
201 | 0 | len, |
202 | 0 | } |
203 | 0 | } |
204 | | |
205 | 0 | if let State::ReadBody { len, compression } = self.state { |
206 | | // if we haven't read enough of the message then return and keep |
207 | | // reading |
208 | 0 | if self.buf.remaining() < len || self.buf.len() < len { |
209 | 0 | return Ok(None); |
210 | 0 | } |
211 | | |
212 | 0 | let decode_buf = if let Some(encoding) = compression { |
213 | 0 | self.decompress_buf.clear(); |
214 | 0 | let limit = self |
215 | 0 | .max_message_size |
216 | 0 | .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); |
217 | 0 | let limited_out_buf = (&mut self.decompress_buf).limit(limit); |
218 | | |
219 | 0 | if let Err(err) = decompress( |
220 | 0 | CompressionSettings { |
221 | 0 | encoding, |
222 | 0 | buffer_growth_interval: buffer_settings.buffer_size, |
223 | 0 | }, |
224 | 0 | &mut self.buf, |
225 | 0 | limited_out_buf, |
226 | 0 | len, |
227 | 0 | ) { |
228 | 0 | if matches!(err.kind(), std::io::ErrorKind::WriteZero) { |
229 | 0 | return Err(Status::resource_exhausted(format!( |
230 | 0 | "Error decompressing: size limit, of {limit} bytes, exceeded while decompressing message" |
231 | 0 | ))); |
232 | 0 | } |
233 | 0 | let message = if let Direction::Response(status) = self.direction { |
234 | 0 | format!( |
235 | 0 | "Error decompressing: {err}, while receiving response with status: {status}" |
236 | | ) |
237 | | } else { |
238 | 0 | format!("Error decompressing: {err}, while sending request") |
239 | | }; |
240 | 0 | return Err(Status::internal(message)); |
241 | 0 | } |
242 | 0 | let decompressed_len = self.decompress_buf.len(); |
243 | 0 | DecodeBuf::new(&mut self.decompress_buf, decompressed_len) |
244 | | } else { |
245 | 0 | DecodeBuf::new(&mut self.buf, len) |
246 | | }; |
247 | | |
248 | 0 | return Ok(Some(decode_buf)); |
249 | 0 | } |
250 | | |
251 | 0 | Ok(None) |
252 | 0 | } |
253 | | |
254 | | // Returns Some(()) if data was found or None if the loop in `poll_next` should break |
255 | 0 | fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> { |
256 | 0 | let frame = match ready!(Pin::new(self.body.get_mut()).poll_frame(cx)) { |
257 | 0 | Some(Ok(frame)) => frame, |
258 | 0 | Some(Err(status)) => { |
259 | 0 | if self.direction == Direction::Request && status.code() == Code::Cancelled { |
260 | 0 | return Poll::Ready(Ok(None)); |
261 | 0 | } |
262 | | |
263 | 0 | let _ = std::mem::replace(&mut self.state, State::Error(Some(status.clone()))); |
264 | 0 | debug!("decoder inner stream error: {:?}", status); |
265 | 0 | return Poll::Ready(Err(status)); |
266 | | } |
267 | | None => { |
268 | | // FIXME: improve buf usage. |
269 | 0 | return Poll::Ready(if self.buf.has_remaining() { |
270 | 0 | trace!("unexpected EOF decoding stream, state: {:?}", self.state); |
271 | 0 | Err(Status::internal("Unexpected EOF decoding stream.")) |
272 | | } else { |
273 | 0 | Ok(None) |
274 | | }); |
275 | | } |
276 | | }; |
277 | | |
278 | 0 | Poll::Ready(if frame.is_data() { |
279 | 0 | self.buf.put(frame.into_data().unwrap()); |
280 | 0 | Ok(Some(())) |
281 | 0 | } else if frame.is_trailers() { |
282 | 0 | if let Some(trailers) = &mut self.trailers { |
283 | 0 | trailers.extend(frame.into_trailers().unwrap()); |
284 | 0 | } else { |
285 | 0 | self.trailers = Some(frame.into_trailers().unwrap()); |
286 | 0 | } |
287 | | |
288 | 0 | Ok(None) |
289 | | } else { |
290 | 0 | panic!("unexpected frame: {frame:?}"); |
291 | | }) |
292 | 0 | } |
293 | | |
294 | 0 | fn response(&mut self) -> Result<(), Status> { |
295 | 0 | if let Direction::Response(status) = self.direction { |
296 | 0 | if let Err(Some(e)) = crate::status::infer_grpc_status(self.trailers.as_ref(), status) { |
297 | | // If the trailers contain a grpc-status, then we should return that as the error |
298 | | // and otherwise stop the stream (by taking the error state) |
299 | 0 | self.trailers.take(); |
300 | 0 | return Err(e); |
301 | 0 | } |
302 | 0 | } |
303 | 0 | Ok(()) |
304 | 0 | } |
305 | | } |
306 | | |
307 | | impl<T> Streaming<T> { |
308 | | /// Fetch the next message from this stream. |
309 | | /// |
310 | | /// # Return value |
311 | | /// |
312 | | /// - `Result::Err(val)` means a gRPC error was sent by the sender instead |
313 | | /// of a valid response message. Refer to [`Status::code`] and |
314 | | /// [`Status::message`] to examine possible error causes. |
315 | | /// |
316 | | /// - `Result::Ok(None)` means the stream was closed by the sender and no |
317 | | /// more messages will be delivered. Further attempts to call |
318 | | /// [`Streaming::message`] will result in the same return value. |
319 | | /// |
320 | | /// - `Result::Ok(Some(val))` means the sender streamed a valid response |
321 | | /// message `val`. |
322 | | /// |
323 | | /// ```rust |
324 | | /// # use tonic::{Streaming, Status, codec::Decoder}; |
325 | | /// # use std::fmt::Debug; |
326 | | /// # async fn next_message_ex<T, D>(mut request: Streaming<T>) -> Result<(), Status> |
327 | | /// # where T: Debug, |
328 | | /// # D: Decoder<Item = T, Error = Status> + Send + 'static, |
329 | | /// # { |
330 | | /// if let Some(next_message) = request.message().await? { |
331 | | /// println!("{:?}", next_message); |
332 | | /// } |
333 | | /// # Ok(()) |
334 | | /// # } |
335 | | /// ``` |
336 | 0 | pub async fn message(&mut self) -> Result<Option<T>, Status> {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message |
337 | 0 | match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0}::{closure#0} |
338 | 0 | Some(Ok(m)) => Ok(Some(m)), |
339 | 0 | Some(Err(e)) => Err(e), |
340 | 0 | None => Ok(None), |
341 | | } |
342 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0} |
343 | | |
344 | | /// Fetch the trailing metadata. |
345 | | /// |
346 | | /// This will drain the stream of all its messages to receive the trailing |
347 | | /// metadata. If [`Streaming::message`] returns `None` then this function |
348 | | /// will not need to poll for trailers since the body was totally consumed. |
349 | | /// |
350 | | /// ```rust |
351 | | /// # use tonic::{Streaming, Status}; |
352 | | /// # async fn trailers_ex<T>(mut request: Streaming<T>) -> Result<(), Status> { |
353 | | /// if let Some(metadata) = request.trailers().await? { |
354 | | /// println!("{:?}", metadata); |
355 | | /// } |
356 | | /// # Ok(()) |
357 | | /// # } |
358 | | /// ``` |
359 | 0 | pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers |
360 | | // Shortcut to see if we already pulled the trailers in the stream step |
361 | | // we need to do that so that the stream can error on trailing grpc-status |
362 | 0 | if let Some(trailers) = self.inner.trailers.take() { |
363 | 0 | return Ok(Some(MetadataMap::from_headers(trailers))); |
364 | 0 | } |
365 | | |
366 | | // To fetch the trailers we must clear the body and drop it. |
367 | 0 | while self.message().await?.is_some() {} |
368 | | |
369 | | // Since we call poll_trailers internally on poll_next we need to |
370 | | // check if it got cached again. |
371 | 0 | if let Some(trailers) = self.inner.trailers.take() { |
372 | 0 | return Ok(Some(MetadataMap::from_headers(trailers))); |
373 | 0 | } |
374 | | |
375 | | // We've polled through all the frames, and still no trailers, return None |
376 | 0 | Ok(None) |
377 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers::{closure#0} |
378 | | |
379 | 0 | fn decode_chunk(&mut self) -> Result<Option<T>, Status> { |
380 | 0 | match self |
381 | 0 | .inner |
382 | 0 | .decode_chunk(self.decoder.get_mut().buffer_settings())? |
383 | | { |
384 | 0 | Some(mut decode_buf) => match self.decoder.get_mut().decode(&mut decode_buf)? { |
385 | 0 | Some(msg) => { |
386 | 0 | self.inner.state = State::ReadHeader; |
387 | 0 | Ok(Some(msg)) |
388 | | } |
389 | 0 | None => Ok(None), |
390 | | }, |
391 | 0 | None => Ok(None), |
392 | | } |
393 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::decode_chunk Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::decode_chunk Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::decode_chunk |
394 | | } |
395 | | |
396 | | impl<T> Stream for Streaming<T> { |
397 | | type Item = Result<T, Status>; |
398 | | |
399 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
400 | | loop { |
401 | | // When the stream encounters an error yield that error once and then on subsequent |
402 | | // calls to poll_next return Poll::Ready(None) indicating that the stream has been |
403 | | // fully exhausted. |
404 | 0 | if let State::Error(status) = &mut self.inner.state { |
405 | 0 | return Poll::Ready(status.take().map(Err)); |
406 | 0 | } |
407 | | |
408 | 0 | if let Some(item) = self.decode_chunk()? { |
409 | 0 | return Poll::Ready(Some(Ok(item))); |
410 | 0 | } |
411 | | |
412 | 0 | if ready!(self.inner.poll_frame(cx))?.is_none() { |
413 | 0 | match self.inner.response() { |
414 | 0 | Ok(()) => return Poll::Ready(None), |
415 | 0 | Err(err) => self.inner.state = State::Error(Some(err)), |
416 | | } |
417 | 0 | } |
418 | | } |
419 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse> as futures_core::stream::Stream>::poll_next Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse> as futures_core::stream::Stream>::poll_next Unexecuted instantiation: <tonic::codec::decode::Streaming<_> as futures_core::stream::Stream>::poll_next |
420 | | } |
421 | | |
422 | | impl<T> fmt::Debug for Streaming<T> { |
423 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
424 | 0 | f.debug_struct("Streaming").finish() |
425 | 0 | } |
426 | | } |
427 | | |
428 | | #[cfg(test)] |
429 | | static_assertions::assert_impl_all!(Streaming<()>: Send, Sync); |