/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.13.0/src/codec/decode.rs
Line | Count | Source |
1 | | use super::compression::{decompress, CompressionEncoding, CompressionSettings}; |
2 | | use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE}; |
3 | | use crate::{body::Body, metadata::MetadataMap, Code, Status}; |
4 | | use bytes::{Buf, BufMut, BytesMut}; |
5 | | use http::{HeaderMap, StatusCode}; |
6 | | use http_body::Body as HttpBody; |
7 | | use http_body_util::BodyExt; |
8 | | use std::{ |
9 | | fmt, future, |
10 | | pin::Pin, |
11 | | task::ready, |
12 | | task::{Context, Poll}, |
13 | | }; |
14 | | use tokio_stream::Stream; |
15 | | use tracing::{debug, trace}; |
16 | | |
17 | | /// Streaming requests and responses. |
18 | | /// |
19 | | /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface |
20 | | /// to fetch the message stream and trailing metadata |
21 | | pub struct Streaming<T> { |
22 | | decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>, |
23 | | inner: StreamingInner, |
24 | | } |
25 | | |
26 | | struct StreamingInner { |
27 | | body: Body, |
28 | | state: State, |
29 | | direction: Direction, |
30 | | buf: BytesMut, |
31 | | trailers: Option<HeaderMap>, |
32 | | decompress_buf: BytesMut, |
33 | | encoding: Option<CompressionEncoding>, |
34 | | max_message_size: Option<usize>, |
35 | | } |
36 | | |
37 | | impl<T> Unpin for Streaming<T> {} |
38 | | |
39 | | #[derive(Debug, Clone)] |
40 | | enum State { |
41 | | ReadHeader, |
42 | | ReadBody { |
43 | | compression: Option<CompressionEncoding>, |
44 | | len: usize, |
45 | | }, |
46 | | Error(Option<Status>), |
47 | | } |
48 | | |
49 | | #[derive(Debug, PartialEq, Eq)] |
50 | | enum Direction { |
51 | | Request, |
52 | | Response(StatusCode), |
53 | | EmptyResponse, |
54 | | } |
55 | | |
56 | | impl<T> Streaming<T> { |
57 | | /// Create a new streaming response in the grpc response format for decoding a response [Body] |
58 | | /// into message of type T |
59 | 0 | pub fn new_response<B, D>( |
60 | 0 | decoder: D, |
61 | 0 | body: B, |
62 | 0 | status_code: StatusCode, |
63 | 0 | encoding: Option<CompressionEncoding>, |
64 | 0 | max_message_size: Option<usize>, |
65 | 0 | ) -> Self |
66 | 0 | where |
67 | 0 | B: HttpBody + Send + 'static, |
68 | 0 | B::Error: Into<crate::BoxError>, |
69 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
70 | | { |
71 | 0 | Self::new( |
72 | 0 | decoder, |
73 | 0 | body, |
74 | 0 | Direction::Response(status_code), |
75 | 0 | encoding, |
76 | 0 | max_message_size, |
77 | | ) |
78 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_response::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_response::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_response::<_, _> |
79 | | |
80 | | /// Create empty response. For creating responses that have no content (headers + trailers only) |
81 | 0 | pub fn new_empty<B, D>(decoder: D, body: B) -> Self |
82 | 0 | where |
83 | 0 | B: HttpBody + Send + 'static, |
84 | 0 | B::Error: Into<crate::BoxError>, |
85 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
86 | | { |
87 | 0 | Self::new(decoder, body, Direction::EmptyResponse, None, None) |
88 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new_empty::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new_empty::<_, _> |
89 | | |
90 | | /// Create a new streaming request in the grpc response format for decoding a request [Body] |
91 | | /// into message of type T |
92 | 0 | pub fn new_request<B, D>( |
93 | 0 | decoder: D, |
94 | 0 | body: B, |
95 | 0 | encoding: Option<CompressionEncoding>, |
96 | 0 | max_message_size: Option<usize>, |
97 | 0 | ) -> Self |
98 | 0 | where |
99 | 0 | B: HttpBody + Send + 'static, |
100 | 0 | B::Error: Into<crate::BoxError>, |
101 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
102 | | { |
103 | 0 | Self::new( |
104 | 0 | decoder, |
105 | 0 | body, |
106 | 0 | Direction::Request, |
107 | 0 | encoding, |
108 | 0 | max_message_size, |
109 | | ) |
110 | 0 | } |
111 | | |
112 | 0 | fn new<B, D>( |
113 | 0 | decoder: D, |
114 | 0 | body: B, |
115 | 0 | direction: Direction, |
116 | 0 | encoding: Option<CompressionEncoding>, |
117 | 0 | max_message_size: Option<usize>, |
118 | 0 | ) -> Self |
119 | 0 | where |
120 | 0 | B: HttpBody + Send + 'static, |
121 | 0 | B::Error: Into<crate::BoxError>, |
122 | 0 | D: Decoder<Item = T, Error = Status> + Send + 'static, |
123 | | { |
124 | 0 | let buffer_size = decoder.buffer_settings().buffer_size; |
125 | | Self { |
126 | 0 | decoder: Box::new(decoder), |
127 | | inner: StreamingInner { |
128 | 0 | body: Body::new( |
129 | 0 | body.map_frame(|frame| { |
130 | 0 | frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0}::{closure#0} |
131 | 0 | }) Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#0} |
132 | 0 | .map_err(|err| Status::map_error(err.into())), Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#1}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#1}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _>::{closure#1} |
133 | | ), |
134 | 0 | state: State::ReadHeader, |
135 | 0 | direction, |
136 | 0 | buf: BytesMut::with_capacity(buffer_size), |
137 | 0 | trailers: None, |
138 | 0 | decompress_buf: BytesMut::new(), |
139 | 0 | encoding, |
140 | 0 | max_message_size, |
141 | | }, |
142 | | } |
143 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::new::<hyper::body::incoming::Incoming, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::new::<_, _> |
144 | | } |
145 | | |
146 | | impl StreamingInner { |
147 | 0 | fn decode_chunk( |
148 | 0 | &mut self, |
149 | 0 | buffer_settings: BufferSettings, |
150 | 0 | ) -> Result<Option<DecodeBuf<'_>>, Status> { |
151 | 0 | if let State::ReadHeader = self.state { |
152 | 0 | if self.buf.remaining() < HEADER_SIZE { |
153 | 0 | return Ok(None); |
154 | 0 | } |
155 | | |
156 | 0 | let compression_encoding = match self.buf.get_u8() { |
157 | 0 | 0 => None, |
158 | | 1 => { |
159 | | { |
160 | 0 | if self.encoding.is_some() { |
161 | 0 | self.encoding |
162 | | } else { |
163 | | // https://grpc.github.io/grpc/core/md_doc_compression.html |
164 | | // An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding |
165 | | // entry different from identity in its metadata MUST fail with INTERNAL status, |
166 | | // its associated description indicating the invalid Compressed-Flag condition. |
167 | 0 | return Err(Status::internal( "protocol error: received message with compressed-flag but no grpc-encoding was specified")); |
168 | | } |
169 | | } |
170 | | } |
171 | 0 | f => { |
172 | 0 | trace!("unexpected compression flag"); |
173 | 0 | let message = if let Direction::Response(status) = self.direction { |
174 | 0 | format!( |
175 | 0 | "protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1) while receiving response with status: {}", |
176 | | f, status |
177 | | ) |
178 | | } else { |
179 | 0 | format!("protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1), while sending request", f) |
180 | | }; |
181 | 0 | return Err(Status::internal(message)); |
182 | | } |
183 | | }; |
184 | | |
185 | 0 | let len = self.buf.get_u32() as usize; |
186 | 0 | let limit = self |
187 | 0 | .max_message_size |
188 | 0 | .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); |
189 | 0 | if len > limit { |
190 | 0 | return Err(Status::out_of_range( |
191 | 0 | format!( |
192 | 0 | "Error, decoded message length too large: found {} bytes, the limit is: {} bytes", |
193 | 0 | len, limit |
194 | 0 | ), |
195 | 0 | )); |
196 | 0 | } |
197 | | |
198 | 0 | self.buf.reserve(len); |
199 | | |
200 | 0 | self.state = State::ReadBody { |
201 | 0 | compression: compression_encoding, |
202 | 0 | len, |
203 | 0 | } |
204 | 0 | } |
205 | | |
206 | 0 | if let State::ReadBody { len, compression } = self.state { |
207 | | // if we haven't read enough of the message then return and keep |
208 | | // reading |
209 | 0 | if self.buf.remaining() < len || self.buf.len() < len { |
210 | 0 | return Ok(None); |
211 | 0 | } |
212 | | |
213 | 0 | let decode_buf = if let Some(encoding) = compression { |
214 | 0 | self.decompress_buf.clear(); |
215 | | |
216 | 0 | if let Err(err) = decompress( |
217 | 0 | CompressionSettings { |
218 | 0 | encoding, |
219 | 0 | buffer_growth_interval: buffer_settings.buffer_size, |
220 | 0 | }, |
221 | 0 | &mut self.buf, |
222 | 0 | &mut self.decompress_buf, |
223 | 0 | len, |
224 | 0 | ) { |
225 | 0 | let message = if let Direction::Response(status) = self.direction { |
226 | 0 | format!( |
227 | 0 | "Error decompressing: {}, while receiving response with status: {}", |
228 | | err, status |
229 | | ) |
230 | | } else { |
231 | 0 | format!("Error decompressing: {}, while sending request", err) |
232 | | }; |
233 | 0 | return Err(Status::internal(message)); |
234 | 0 | } |
235 | 0 | let decompressed_len = self.decompress_buf.len(); |
236 | 0 | DecodeBuf::new(&mut self.decompress_buf, decompressed_len) |
237 | | } else { |
238 | 0 | DecodeBuf::new(&mut self.buf, len) |
239 | | }; |
240 | | |
241 | 0 | return Ok(Some(decode_buf)); |
242 | 0 | } |
243 | | |
244 | 0 | Ok(None) |
245 | 0 | } |
246 | | |
247 | | // Returns Some(()) if data was found or None if the loop in `poll_next` should break |
248 | 0 | fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> { |
249 | 0 | let frame = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { |
250 | 0 | Some(Ok(frame)) => frame, |
251 | 0 | Some(Err(status)) => { |
252 | 0 | if self.direction == Direction::Request && status.code() == Code::Cancelled { |
253 | 0 | return Poll::Ready(Ok(None)); |
254 | 0 | } |
255 | | |
256 | 0 | let _ = std::mem::replace(&mut self.state, State::Error(Some(status.clone()))); |
257 | 0 | debug!("decoder inner stream error: {:?}", status); |
258 | 0 | return Poll::Ready(Err(status)); |
259 | | } |
260 | | None => { |
261 | | // FIXME: improve buf usage. |
262 | 0 | return Poll::Ready(if self.buf.has_remaining() { |
263 | 0 | trace!("unexpected EOF decoding stream, state: {:?}", self.state); |
264 | 0 | Err(Status::internal("Unexpected EOF decoding stream.")) |
265 | | } else { |
266 | 0 | Ok(None) |
267 | | }); |
268 | | } |
269 | | }; |
270 | | |
271 | 0 | Poll::Ready(if frame.is_data() { |
272 | 0 | self.buf.put(frame.into_data().unwrap()); |
273 | 0 | Ok(Some(())) |
274 | 0 | } else if frame.is_trailers() { |
275 | 0 | if let Some(trailers) = &mut self.trailers { |
276 | 0 | trailers.extend(frame.into_trailers().unwrap()); |
277 | 0 | } else { |
278 | 0 | self.trailers = Some(frame.into_trailers().unwrap()); |
279 | 0 | } |
280 | | |
281 | 0 | Ok(None) |
282 | | } else { |
283 | 0 | panic!("unexpected frame: {:?}", frame); |
284 | | }) |
285 | 0 | } |
286 | | |
287 | 0 | fn response(&mut self) -> Result<(), Status> { |
288 | 0 | if let Direction::Response(status) = self.direction { |
289 | 0 | if let Err(Some(e)) = crate::status::infer_grpc_status(self.trailers.as_ref(), status) { |
290 | | // If the trailers contain a grpc-status, then we should return that as the error |
291 | | // and otherwise stop the stream (by taking the error state) |
292 | 0 | self.trailers.take(); |
293 | 0 | return Err(e); |
294 | 0 | } |
295 | 0 | } |
296 | 0 | Ok(()) |
297 | 0 | } |
298 | | } |
299 | | |
300 | | impl<T> Streaming<T> { |
301 | | /// Fetch the next message from this stream. |
302 | | /// |
303 | | /// # Return value |
304 | | /// |
305 | | /// - `Result::Err(val)` means a gRPC error was sent by the sender instead |
306 | | /// of a valid response message. Refer to [`Status::code`] and |
307 | | /// [`Status::message`] to examine possible error causes. |
308 | | /// |
309 | | /// - `Result::Ok(None)` means the stream was closed by the sender and no |
310 | | /// more messages will be delivered. Further attempts to call |
311 | | /// [`Streaming::message`] will result in the same return value. |
312 | | /// |
313 | | /// - `Result::Ok(Some(val))` means the sender streamed a valid response |
314 | | /// message `val`. |
315 | | /// |
316 | | /// ```rust |
317 | | /// # use tonic::{Streaming, Status, codec::Decoder}; |
318 | | /// # use std::fmt::Debug; |
319 | | /// # async fn next_message_ex<T, D>(mut request: Streaming<T>) -> Result<(), Status> |
320 | | /// # where T: Debug, |
321 | | /// # D: Decoder<Item = T, Error = Status> + Send + 'static, |
322 | | /// # { |
323 | | /// if let Some(next_message) = request.message().await? { |
324 | | /// println!("{:?}", next_message); |
325 | | /// } |
326 | | /// # Ok(()) |
327 | | /// # } |
328 | | /// ``` |
329 | 0 | pub async fn message(&mut self) -> Result<Option<T>, Status> {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message |
330 | 0 | match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0}::{closure#0} |
331 | 0 | Some(Ok(m)) => Ok(Some(m)), |
332 | 0 | Some(Err(e)) => Err(e), |
333 | 0 | None => Ok(None), |
334 | | } |
335 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::message::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::message::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::message::{closure#0} |
336 | | |
337 | | /// Fetch the trailing metadata. |
338 | | /// |
339 | | /// This will drain the stream of all its messages to receive the trailing |
340 | | /// metadata. If [`Streaming::message`] returns `None` then this function |
341 | | /// will not need to poll for trailers since the body was totally consumed. |
342 | | /// |
343 | | /// ```rust |
344 | | /// # use tonic::{Streaming, Status}; |
345 | | /// # async fn trailers_ex<T>(mut request: Streaming<T>) -> Result<(), Status> { |
346 | | /// if let Some(metadata) = request.trailers().await? { |
347 | | /// println!("{:?}", metadata); |
348 | | /// } |
349 | | /// # Ok(()) |
350 | | /// # } |
351 | | /// ``` |
352 | 0 | pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers |
353 | | // Shortcut to see if we already pulled the trailers in the stream step |
354 | | // we need to do that so that the stream can error on trailing grpc-status |
355 | 0 | if let Some(trailers) = self.inner.trailers.take() { |
356 | 0 | return Ok(Some(MetadataMap::from_headers(trailers))); |
357 | 0 | } |
358 | | |
359 | | // To fetch the trailers we must clear the body and drop it. |
360 | 0 | while self.message().await?.is_some() {} |
361 | | |
362 | | // Since we call poll_trailers internally on poll_next we need to |
363 | | // check if it got cached again. |
364 | 0 | if let Some(trailers) = self.inner.trailers.take() { |
365 | 0 | return Ok(Some(MetadataMap::from_headers(trailers))); |
366 | 0 | } |
367 | | |
368 | | // We've polled through all the frames, and still no trailers, return None |
369 | 0 | Ok(None) |
370 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::trailers::{closure#0}Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::trailers::{closure#0} |
371 | | |
372 | 0 | fn decode_chunk(&mut self) -> Result<Option<T>, Status> { |
373 | 0 | match self.inner.decode_chunk(self.decoder.buffer_settings())? { |
374 | 0 | Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? { |
375 | 0 | Some(msg) => { |
376 | 0 | self.inner.state = State::ReadHeader; |
377 | 0 | Ok(Some(msg)) |
378 | | } |
379 | 0 | None => Ok(None), |
380 | | }, |
381 | 0 | None => Ok(None), |
382 | | } |
383 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::decode_chunk Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::decode_chunk Unexecuted instantiation: <tonic::codec::decode::Streaming<_>>::decode_chunk |
384 | | } |
385 | | |
386 | | impl<T> Stream for Streaming<T> { |
387 | | type Item = Result<T, Status>; |
388 | | |
389 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
390 | | loop { |
391 | | // When the stream encounters an error yield that error once and then on subsequent |
392 | | // calls to poll_next return Poll::Ready(None) indicating that the stream has been |
393 | | // fully exhausted. |
394 | 0 | if let State::Error(status) = &mut self.inner.state { |
395 | 0 | return Poll::Ready(status.take().map(Err)); |
396 | 0 | } |
397 | | |
398 | 0 | if let Some(item) = self.decode_chunk()? { |
399 | 0 | return Poll::Ready(Some(Ok(item))); |
400 | 0 | } |
401 | | |
402 | 0 | if ready!(self.inner.poll_frame(cx))?.is_none() { |
403 | 0 | match self.inner.response() { |
404 | 0 | Ok(()) => return Poll::Ready(None), |
405 | 0 | Err(err) => self.inner.state = State::Error(Some(err)), |
406 | | } |
407 | 0 | } |
408 | | } |
409 | 0 | } Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::istio::ca::IstioCertificateResponse> as futures_core::stream::Stream>::poll_next Unexecuted instantiation: <tonic::codec::decode::Streaming<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse> as futures_core::stream::Stream>::poll_next Unexecuted instantiation: <tonic::codec::decode::Streaming<_> as futures_core::stream::Stream>::poll_next |
410 | | } |
411 | | |
412 | | impl<T> fmt::Debug for Streaming<T> { |
413 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
414 | 0 | f.debug_struct("Streaming").finish() |
415 | 0 | } |
416 | | } |
417 | | |
418 | | #[cfg(test)] |
419 | | static_assertions::assert_impl_all!(Streaming<()>: Send); |