/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.13.0/src/client/grpc.rs
Line | Count | Source |
1 | | use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings}; |
2 | | use crate::codec::EncodeBody; |
3 | | use crate::metadata::GRPC_CONTENT_TYPE; |
4 | | use crate::{ |
5 | | body::Body, |
6 | | client::GrpcService, |
7 | | codec::{Codec, Decoder, Streaming}, |
8 | | request::SanitizeHeaders, |
9 | | Code, Request, Response, Status, |
10 | | }; |
11 | | use http::{ |
12 | | header::{HeaderValue, CONTENT_TYPE, TE}, |
13 | | uri::{PathAndQuery, Uri}, |
14 | | }; |
15 | | use http_body::Body as HttpBody; |
16 | | use std::{fmt, future, pin::pin}; |
17 | | use tokio_stream::{Stream, StreamExt}; |
18 | | |
19 | | /// A gRPC client dispatcher. |
20 | | /// |
21 | | /// This will wrap some inner [`GrpcService`] and will encode/decode |
22 | | /// messages via the provided codec. |
23 | | /// |
24 | | /// Each request method takes a [`Request`], a [`PathAndQuery`], and a |
25 | | /// [`Codec`]. The request contains the message to send via the |
26 | | /// [`Codec::encoder`]. The path determines the fully qualified path |
27 | | /// that will be append to the outgoing uri. The path must follow |
28 | | /// the conventions explained in the [gRPC protocol definition] under `Path →`. An |
29 | | /// example of this path could look like `/greeter.Greeter/SayHello`. |
30 | | /// |
31 | | /// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests |
32 | | pub struct Grpc<T> { |
33 | | inner: T, |
34 | | config: GrpcConfig, |
35 | | } |
36 | | |
37 | | struct GrpcConfig { |
38 | | origin: Uri, |
39 | | /// Which compression encodings does the client accept? |
40 | | accept_compression_encodings: EnabledCompressionEncodings, |
41 | | /// The compression encoding that will be applied to requests. |
42 | | send_compression_encodings: Option<CompressionEncoding>, |
43 | | /// Limits the maximum size of a decoded message. |
44 | | max_decoding_message_size: Option<usize>, |
45 | | /// Limits the maximum size of an encoded message. |
46 | | max_encoding_message_size: Option<usize>, |
47 | | } |
48 | | |
49 | | impl<T> Grpc<T> { |
50 | | /// Creates a new gRPC client with the provided [`GrpcService`]. |
51 | 0 | pub fn new(inner: T) -> Self { |
52 | 0 | Self::with_origin(inner, Uri::default()) |
53 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::new Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::new |
54 | | |
55 | | /// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`. |
56 | | /// |
57 | | /// The provided Uri will use only the scheme and authority parts as the |
58 | | /// path_and_query portion will be set for each method. |
59 | 0 | pub fn with_origin(inner: T, origin: Uri) -> Self { |
60 | 0 | Self { |
61 | 0 | inner, |
62 | 0 | config: GrpcConfig { |
63 | 0 | origin, |
64 | 0 | send_compression_encodings: None, |
65 | 0 | accept_compression_encodings: EnabledCompressionEncodings::default(), |
66 | 0 | max_decoding_message_size: None, |
67 | 0 | max_encoding_message_size: None, |
68 | 0 | }, |
69 | 0 | } |
70 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::with_origin Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::with_origin |
71 | | |
72 | | /// Compress requests with the provided encoding. |
73 | | /// |
74 | | /// Requires the server to accept the specified encoding, otherwise it might return an error. |
75 | | /// |
76 | | /// # Example |
77 | | /// |
78 | | /// The most common way of using this is through a client generated by tonic-build: |
79 | | /// |
80 | | /// ```rust |
81 | | /// use tonic::transport::Channel; |
82 | | /// # enum CompressionEncoding { Gzip } |
83 | | /// # struct TestClient<T>(T); |
84 | | /// # impl<T> TestClient<T> { |
85 | | /// # fn new(channel: T) -> Self { Self(channel) } |
86 | | /// # fn send_compressed(self, _: CompressionEncoding) -> Self { self } |
87 | | /// # } |
88 | | /// |
89 | | /// # async { |
90 | | /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap()) |
91 | | /// .connect() |
92 | | /// .await |
93 | | /// .unwrap(); |
94 | | /// |
95 | | /// let client = TestClient::new(channel).send_compressed(CompressionEncoding::Gzip); |
96 | | /// # }; |
97 | | /// ``` |
98 | 0 | pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { |
99 | 0 | self.config.send_compression_encodings = Some(encoding); |
100 | 0 | self |
101 | 0 | } |
102 | | |
103 | | /// Enable accepting compressed responses. |
104 | | /// |
105 | | /// Requires the server to also support sending compressed responses. |
106 | | /// |
107 | | /// # Example |
108 | | /// |
109 | | /// The most common way of using this is through a client generated by tonic-build: |
110 | | /// |
111 | | /// ```rust |
112 | | /// use tonic::transport::Channel; |
113 | | /// # enum CompressionEncoding { Gzip } |
114 | | /// # struct TestClient<T>(T); |
115 | | /// # impl<T> TestClient<T> { |
116 | | /// # fn new(channel: T) -> Self { Self(channel) } |
117 | | /// # fn accept_compressed(self, _: CompressionEncoding) -> Self { self } |
118 | | /// # } |
119 | | /// |
120 | | /// # async { |
121 | | /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap()) |
122 | | /// .connect() |
123 | | /// .await |
124 | | /// .unwrap(); |
125 | | /// |
126 | | /// let client = TestClient::new(channel).accept_compressed(CompressionEncoding::Gzip); |
127 | | /// # }; |
128 | | /// ``` |
129 | 0 | pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { |
130 | 0 | self.config.accept_compression_encodings.enable(encoding); |
131 | 0 | self |
132 | 0 | } |
133 | | |
134 | | /// Limits the maximum size of a decoded message. |
135 | | /// |
136 | | /// # Example |
137 | | /// |
138 | | /// The most common way of using this is through a client generated by tonic-build: |
139 | | /// |
140 | | /// ```rust |
141 | | /// use tonic::transport::Channel; |
142 | | /// # struct TestClient<T>(T); |
143 | | /// # impl<T> TestClient<T> { |
144 | | /// # fn new(channel: T) -> Self { Self(channel) } |
145 | | /// # fn max_decoding_message_size(self, _: usize) -> Self { self } |
146 | | /// # } |
147 | | /// |
148 | | /// # async { |
149 | | /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap()) |
150 | | /// .connect() |
151 | | /// .await |
152 | | /// .unwrap(); |
153 | | /// |
154 | | /// // Set the limit to 2MB, Defaults to 4MB. |
155 | | /// let limit = 2 * 1024 * 1024; |
156 | | /// let client = TestClient::new(channel).max_decoding_message_size(limit); |
157 | | /// # }; |
158 | | /// ``` |
159 | 0 | pub fn max_decoding_message_size(mut self, limit: usize) -> Self { |
160 | 0 | self.config.max_decoding_message_size = Some(limit); |
161 | 0 | self |
162 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::max_decoding_message_size Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::max_decoding_message_size |
163 | | |
164 | | /// Limits the maximum size of an encoded message. |
165 | | /// |
166 | | /// # Example |
167 | | /// |
168 | | /// The most common way of using this is through a client generated by tonic-build: |
169 | | /// |
170 | | /// ```rust |
171 | | /// use tonic::transport::Channel; |
172 | | /// # struct TestClient<T>(T); |
173 | | /// # impl<T> TestClient<T> { |
174 | | /// # fn new(channel: T) -> Self { Self(channel) } |
175 | | /// # fn max_encoding_message_size(self, _: usize) -> Self { self } |
176 | | /// # } |
177 | | /// |
178 | | /// # async { |
179 | | /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap()) |
180 | | /// .connect() |
181 | | /// .await |
182 | | /// .unwrap(); |
183 | | /// |
184 | | /// // Set the limit to 2MB, Defaults to 4MB. |
185 | | /// let limit = 2 * 1024 * 1024; |
186 | | /// let client = TestClient::new(channel).max_encoding_message_size(limit); |
187 | | /// # }; |
188 | | /// ``` |
189 | 0 | pub fn max_encoding_message_size(mut self, limit: usize) -> Self { |
190 | 0 | self.config.max_encoding_message_size = Some(limit); |
191 | 0 | self |
192 | 0 | } |
193 | | |
194 | | /// Check if the inner [`GrpcService`] is able to accept a new request. |
195 | | /// |
196 | | /// This will call [`GrpcService::poll_ready`] until it returns ready or |
197 | | /// an error. If this returns ready the inner [`GrpcService`] is ready to |
198 | | /// accept one more request. |
199 | 0 | pub async fn ready(&mut self) -> Result<(), T::Error> |
200 | 0 | where |
201 | 0 | T: GrpcService<Body>, Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready |
202 | 0 | { |
203 | 0 | future::poll_fn(|cx| self.inner.poll_ready(cx)).await Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready::{closure#0}::{closure#0} |
204 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready::{closure#0} |
205 | | |
206 | | /// Send a single unary gRPC request. |
207 | 0 | pub async fn unary<M1, M2, C>( |
208 | 0 | &mut self, |
209 | 0 | request: Request<M1>, |
210 | 0 | path: PathAndQuery, |
211 | 0 | codec: C, |
212 | 0 | ) -> Result<Response<M2>, Status> |
213 | 0 | where |
214 | 0 | T: GrpcService<Body>, |
215 | 0 | T::ResponseBody: HttpBody + Send + 'static, |
216 | 0 | <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>, |
217 | 0 | C: Codec<Encode = M1, Decode = M2>, |
218 | 0 | M1: Send + Sync + 'static, |
219 | 0 | M2: Send + Sync + 'static, Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _> |
220 | 0 | { |
221 | 0 | let request = request.map(|m| tokio_stream::once(m)); Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _>::{closure#0}::{closure#0} |
222 | 0 | self.client_streaming(request, path, codec).await |
223 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _>::{closure#0} |
224 | | |
225 | | /// Send a client side streaming gRPC request. |
226 | 0 | pub async fn client_streaming<S, M1, M2, C>( |
227 | 0 | &mut self, |
228 | 0 | request: Request<S>, |
229 | 0 | path: PathAndQuery, |
230 | 0 | codec: C, |
231 | 0 | ) -> Result<Response<M2>, Status> |
232 | 0 | where |
233 | 0 | T: GrpcService<Body>, |
234 | 0 | T::ResponseBody: HttpBody + Send + 'static, |
235 | 0 | <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>, |
236 | 0 | S: Stream<Item = M1> + Send + 'static, |
237 | 0 | C: Codec<Encode = M1, Decode = M2>, |
238 | 0 | M1: Send + Sync + 'static, |
239 | 0 | M2: Send + Sync + 'static, Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _> |
240 | 0 | { |
241 | 0 | let (mut parts, body, extensions) = |
242 | 0 | self.streaming(request, path, codec).await?.into_parts(); |
243 | | |
244 | 0 | let mut body = pin!(body); |
245 | | |
246 | 0 | let message = body |
247 | 0 | .try_next() |
248 | 0 | .await |
249 | 0 | .map_err(|mut status| { |
250 | 0 | status.metadata_mut().merge(parts.clone()); |
251 | 0 | status |
252 | 0 | })? Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0}::{closure#0} |
253 | 0 | .ok_or_else(|| Status::internal("Missing response message."))?;Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#1}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0}::{closure#1} |
254 | | |
255 | 0 | if let Some(trailers) = body.trailers().await? { |
256 | 0 | parts.merge(trailers); |
257 | 0 | } |
258 | | |
259 | 0 | Ok(Response::from_parts(parts, message, extensions)) |
260 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0} |
261 | | |
262 | | /// Send a server side streaming gRPC request. |
263 | 0 | pub async fn server_streaming<M1, M2, C>( |
264 | 0 | &mut self, |
265 | 0 | request: Request<M1>, |
266 | 0 | path: PathAndQuery, |
267 | 0 | codec: C, |
268 | 0 | ) -> Result<Response<Streaming<M2>>, Status> |
269 | 0 | where |
270 | 0 | T: GrpcService<Body>, |
271 | 0 | T::ResponseBody: HttpBody + Send + 'static, |
272 | 0 | <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>, |
273 | 0 | C: Codec<Encode = M1, Decode = M2>, |
274 | 0 | M1: Send + Sync + 'static, |
275 | 0 | M2: Send + Sync + 'static, |
276 | 0 | { |
277 | 0 | let request = request.map(|m| tokio_stream::once(m)); |
278 | 0 | self.streaming(request, path, codec).await |
279 | 0 | } |
280 | | |
281 | | /// Send a bi-directional streaming gRPC request. |
282 | 0 | pub async fn streaming<S, M1, M2, C>( |
283 | 0 | &mut self, |
284 | 0 | request: Request<S>, |
285 | 0 | path: PathAndQuery, |
286 | 0 | mut codec: C, |
287 | 0 | ) -> Result<Response<Streaming<M2>>, Status> |
288 | 0 | where |
289 | 0 | T: GrpcService<Body>, |
290 | 0 | T::ResponseBody: HttpBody + Send + 'static, |
291 | 0 | <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>, |
292 | 0 | S: Stream<Item = M1> + Send + 'static, |
293 | 0 | C: Codec<Encode = M1, Decode = M2>, |
294 | 0 | M1: Send + Sync + 'static, |
295 | 0 | M2: Send + Sync + 'static, Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _> |
296 | 0 | { |
297 | 0 | let request = request |
298 | 0 | .map(|s| { |
299 | 0 | EncodeBody::new_client( |
300 | 0 | codec.encoder(), |
301 | 0 | s.map(Ok), |
302 | 0 | self.config.send_compression_encodings, |
303 | 0 | self.config.max_encoding_message_size, |
304 | | ) |
305 | 0 | }) Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _>::{closure#0}::{closure#0} |
306 | 0 | .map(Body::new); |
307 | | |
308 | 0 | let request = self.config.prepare_request(request, path); |
309 | | |
310 | 0 | let response = self |
311 | 0 | .inner |
312 | 0 | .call(request) |
313 | 0 | .await |
314 | 0 | .map_err(Status::from_error_generic)?; |
315 | | |
316 | 0 | let decoder = codec.decoder(); |
317 | | |
318 | 0 | self.create_response(decoder, response) |
319 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _>::{closure#0} |
320 | | |
321 | | // Keeping this code in a separate function from Self::streaming lets functions that return the |
322 | | // same output share the generated binary code |
323 | 0 | fn create_response<M2>( |
324 | 0 | &self, |
325 | 0 | decoder: impl Decoder<Item = M2, Error = Status> + Send + 'static, |
326 | 0 | response: http::Response<T::ResponseBody>, |
327 | 0 | ) -> Result<Response<Streaming<M2>>, Status> |
328 | 0 | where |
329 | 0 | T: GrpcService<Body>, |
330 | 0 | T::ResponseBody: HttpBody + Send + 'static, |
331 | 0 | <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>, |
332 | | { |
333 | 0 | let encoding = CompressionEncoding::from_encoding_header( |
334 | 0 | response.headers(), |
335 | 0 | self.config.accept_compression_encodings, |
336 | 0 | )?; |
337 | | |
338 | 0 | let status_code = response.status(); |
339 | 0 | let trailers_only_status = Status::from_header_map(response.headers()); |
340 | | |
341 | | // We do not need to check for trailers if the `grpc-status` header is present |
342 | | // with a valid code. |
343 | 0 | let expect_additional_trailers = if let Some(status) = trailers_only_status { |
344 | 0 | if status.code() != Code::Ok { |
345 | 0 | return Err(status); |
346 | 0 | } |
347 | | |
348 | 0 | false |
349 | | } else { |
350 | 0 | true |
351 | | }; |
352 | | |
353 | 0 | let response = response.map(|body| { |
354 | 0 | if expect_additional_trailers { |
355 | 0 | Streaming::new_response( |
356 | 0 | decoder, |
357 | 0 | body, |
358 | 0 | status_code, |
359 | 0 | encoding, |
360 | 0 | self.config.max_decoding_message_size, |
361 | | ) |
362 | | } else { |
363 | 0 | Streaming::new_empty(decoder, body) |
364 | | } |
365 | 0 | }); Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::create_response::<_, _>::{closure#0} |
366 | | |
367 | 0 | Ok(Response::from_http(response)) |
368 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>> Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>> Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::create_response::<_, _> |
369 | | } |
370 | | |
371 | | impl GrpcConfig { |
372 | 0 | fn prepare_request(&self, request: Request<Body>, path: PathAndQuery) -> http::Request<Body> { |
373 | 0 | let mut parts = self.origin.clone().into_parts(); |
374 | | |
375 | 0 | match &parts.path_and_query { |
376 | 0 | Some(pnq) if pnq != "/" => { |
377 | 0 | parts.path_and_query = Some( |
378 | 0 | format!("{}{}", pnq.path(), path) |
379 | 0 | .parse() |
380 | 0 | .expect("must form valid path_and_query"), |
381 | 0 | ) |
382 | | } |
383 | 0 | _ => { |
384 | 0 | parts.path_and_query = Some(path); |
385 | 0 | } |
386 | | } |
387 | | |
388 | 0 | let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri"); |
389 | | |
390 | 0 | let mut request = request.into_http( |
391 | 0 | uri, |
392 | 0 | http::Method::POST, |
393 | | http::Version::HTTP_2, |
394 | 0 | SanitizeHeaders::Yes, |
395 | | ); |
396 | | |
397 | | // Add the gRPC related HTTP headers |
398 | 0 | request |
399 | 0 | .headers_mut() |
400 | 0 | .insert(TE, HeaderValue::from_static("trailers")); |
401 | | |
402 | | // Set the content type |
403 | 0 | request |
404 | 0 | .headers_mut() |
405 | 0 | .insert(CONTENT_TYPE, GRPC_CONTENT_TYPE); |
406 | | |
407 | | #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))] |
408 | | if let Some(encoding) = self.send_compression_encodings { |
409 | | request.headers_mut().insert( |
410 | | crate::codec::compression::ENCODING_HEADER, |
411 | | encoding.into_header_value(), |
412 | | ); |
413 | | } |
414 | | |
415 | 0 | if let Some(header_value) = self |
416 | 0 | .accept_compression_encodings |
417 | 0 | .into_accept_encoding_header_value() |
418 | 0 | { |
419 | 0 | request.headers_mut().insert( |
420 | 0 | crate::codec::compression::ACCEPT_ENCODING_HEADER, |
421 | 0 | header_value, |
422 | 0 | ); |
423 | 0 | } |
424 | | |
425 | 0 | request |
426 | 0 | } |
427 | | } |
428 | | |
429 | | impl<T: Clone> Clone for Grpc<T> { |
430 | 0 | fn clone(&self) -> Self { |
431 | 0 | Self { |
432 | 0 | inner: self.inner.clone(), |
433 | 0 | config: GrpcConfig { |
434 | 0 | origin: self.config.origin.clone(), |
435 | 0 | send_compression_encodings: self.config.send_compression_encodings, |
436 | 0 | accept_compression_encodings: self.config.accept_compression_encodings, |
437 | 0 | max_encoding_message_size: self.config.max_encoding_message_size, |
438 | 0 | max_decoding_message_size: self.config.max_decoding_message_size, |
439 | 0 | }, |
440 | 0 | } |
441 | 0 | } Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel> as core::clone::Clone>::clone Unexecuted instantiation: <tonic::client::grpc::Grpc<_> as core::clone::Clone>::clone |
442 | | } |
443 | | |
444 | | impl<T: fmt::Debug> fmt::Debug for Grpc<T> { |
445 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
446 | 0 | f.debug_struct("Grpc") |
447 | 0 | .field("inner", &self.inner) |
448 | 0 | .field("origin", &self.config.origin) |
449 | 0 | .field( |
450 | 0 | "compression_encoding", |
451 | 0 | &self.config.send_compression_encodings, |
452 | 0 | ) |
453 | 0 | .field( |
454 | 0 | "accept_compression_encodings", |
455 | 0 | &self.config.accept_compression_encodings, |
456 | 0 | ) |
457 | 0 | .field( |
458 | 0 | "max_decoding_message_size", |
459 | 0 | &self.config.max_decoding_message_size, |
460 | 0 | ) |
461 | 0 | .field( |
462 | 0 | "max_encoding_message_size", |
463 | 0 | &self.config.max_encoding_message_size, |
464 | 0 | ) |
465 | 0 | .finish() |
466 | 0 | } |
467 | | } |