Coverage Report

Created: 2025-11-16 06:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tonic-0.13.0/src/client/grpc.rs
Line
Count
Source
1
use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings};
2
use crate::codec::EncodeBody;
3
use crate::metadata::GRPC_CONTENT_TYPE;
4
use crate::{
5
    body::Body,
6
    client::GrpcService,
7
    codec::{Codec, Decoder, Streaming},
8
    request::SanitizeHeaders,
9
    Code, Request, Response, Status,
10
};
11
use http::{
12
    header::{HeaderValue, CONTENT_TYPE, TE},
13
    uri::{PathAndQuery, Uri},
14
};
15
use http_body::Body as HttpBody;
16
use std::{fmt, future, pin::pin};
17
use tokio_stream::{Stream, StreamExt};
18
19
/// A gRPC client dispatcher.
20
///
21
/// This will wrap some inner [`GrpcService`] and will encode/decode
22
/// messages via the provided codec.
23
///
24
/// Each request method takes a [`Request`], a [`PathAndQuery`], and a
25
/// [`Codec`]. The request contains the message to send via the
26
/// [`Codec::encoder`]. The path determines the fully qualified path
27
/// that will be append to the outgoing uri. The path must follow
28
/// the conventions explained in the [gRPC protocol definition] under `Path →`. An
29
/// example of this path could look like `/greeter.Greeter/SayHello`.
30
///
31
/// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
32
pub struct Grpc<T> {
33
    inner: T,
34
    config: GrpcConfig,
35
}
36
37
struct GrpcConfig {
38
    origin: Uri,
39
    /// Which compression encodings does the client accept?
40
    accept_compression_encodings: EnabledCompressionEncodings,
41
    /// The compression encoding that will be applied to requests.
42
    send_compression_encodings: Option<CompressionEncoding>,
43
    /// Limits the maximum size of a decoded message.
44
    max_decoding_message_size: Option<usize>,
45
    /// Limits the maximum size of an encoded message.
46
    max_encoding_message_size: Option<usize>,
47
}
48
49
impl<T> Grpc<T> {
50
    /// Creates a new gRPC client with the provided [`GrpcService`].
51
0
    pub fn new(inner: T) -> Self {
52
0
        Self::with_origin(inner, Uri::default())
53
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::new
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::new
54
55
    /// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`.
56
    ///
57
    /// The provided Uri will use only the scheme and authority parts as the
58
    /// path_and_query portion will be set for each method.
59
0
    pub fn with_origin(inner: T, origin: Uri) -> Self {
60
0
        Self {
61
0
            inner,
62
0
            config: GrpcConfig {
63
0
                origin,
64
0
                send_compression_encodings: None,
65
0
                accept_compression_encodings: EnabledCompressionEncodings::default(),
66
0
                max_decoding_message_size: None,
67
0
                max_encoding_message_size: None,
68
0
            },
69
0
        }
70
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::with_origin
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::with_origin
71
72
    /// Compress requests with the provided encoding.
73
    ///
74
    /// Requires the server to accept the specified encoding, otherwise it might return an error.
75
    ///
76
    /// # Example
77
    ///
78
    /// The most common way of using this is through a client generated by tonic-build:
79
    ///
80
    /// ```rust
81
    /// use tonic::transport::Channel;
82
    /// # enum CompressionEncoding { Gzip }
83
    /// # struct TestClient<T>(T);
84
    /// # impl<T> TestClient<T> {
85
    /// #     fn new(channel: T) -> Self { Self(channel) }
86
    /// #     fn send_compressed(self, _: CompressionEncoding) -> Self { self }
87
    /// # }
88
    ///
89
    /// # async {
90
    /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap())
91
    ///     .connect()
92
    ///     .await
93
    ///     .unwrap();
94
    ///
95
    /// let client = TestClient::new(channel).send_compressed(CompressionEncoding::Gzip);
96
    /// # };
97
    /// ```
98
0
    pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
99
0
        self.config.send_compression_encodings = Some(encoding);
100
0
        self
101
0
    }
102
103
    /// Enable accepting compressed responses.
104
    ///
105
    /// Requires the server to also support sending compressed responses.
106
    ///
107
    /// # Example
108
    ///
109
    /// The most common way of using this is through a client generated by tonic-build:
110
    ///
111
    /// ```rust
112
    /// use tonic::transport::Channel;
113
    /// # enum CompressionEncoding { Gzip }
114
    /// # struct TestClient<T>(T);
115
    /// # impl<T> TestClient<T> {
116
    /// #     fn new(channel: T) -> Self { Self(channel) }
117
    /// #     fn accept_compressed(self, _: CompressionEncoding) -> Self { self }
118
    /// # }
119
    ///
120
    /// # async {
121
    /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap())
122
    ///     .connect()
123
    ///     .await
124
    ///     .unwrap();
125
    ///
126
    /// let client = TestClient::new(channel).accept_compressed(CompressionEncoding::Gzip);
127
    /// # };
128
    /// ```
129
0
    pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
130
0
        self.config.accept_compression_encodings.enable(encoding);
131
0
        self
132
0
    }
133
134
    /// Limits the maximum size of a decoded message.
135
    ///
136
    /// # Example
137
    ///
138
    /// The most common way of using this is through a client generated by tonic-build:
139
    ///
140
    /// ```rust
141
    /// use tonic::transport::Channel;
142
    /// # struct TestClient<T>(T);
143
    /// # impl<T> TestClient<T> {
144
    /// #     fn new(channel: T) -> Self { Self(channel) }
145
    /// #     fn max_decoding_message_size(self, _: usize) -> Self { self }
146
    /// # }
147
    ///
148
    /// # async {
149
    /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap())
150
    ///     .connect()
151
    ///     .await
152
    ///     .unwrap();
153
    ///
154
    /// // Set the limit to 2MB, Defaults to 4MB.
155
    /// let limit = 2 * 1024 * 1024;
156
    /// let client = TestClient::new(channel).max_decoding_message_size(limit);
157
    /// # };
158
    /// ```
159
0
    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
160
0
        self.config.max_decoding_message_size = Some(limit);
161
0
        self
162
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::max_decoding_message_size
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::max_decoding_message_size
163
164
    /// Limits the maximum size of an encoded message.
165
    ///
166
    /// # Example
167
    ///
168
    /// The most common way of using this is through a client generated by tonic-build:
169
    ///
170
    /// ```rust
171
    /// use tonic::transport::Channel;
172
    /// # struct TestClient<T>(T);
173
    /// # impl<T> TestClient<T> {
174
    /// #     fn new(channel: T) -> Self { Self(channel) }
175
    /// #     fn max_encoding_message_size(self, _: usize) -> Self { self }
176
    /// # }
177
    ///
178
    /// # async {
179
    /// let channel = Channel::builder("127.0.0.1:3000".parse().unwrap())
180
    ///     .connect()
181
    ///     .await
182
    ///     .unwrap();
183
    ///
184
    /// // Set the limit to 2MB, Defaults to 4MB.
185
    /// let limit = 2 * 1024 * 1024;
186
    /// let client = TestClient::new(channel).max_encoding_message_size(limit);
187
    /// # };
188
    /// ```
189
0
    pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
190
0
        self.config.max_encoding_message_size = Some(limit);
191
0
        self
192
0
    }
193
194
    /// Check if the inner [`GrpcService`] is able to accept a  new request.
195
    ///
196
    /// This will call [`GrpcService::poll_ready`] until it returns ready or
197
    /// an error. If this returns ready the inner [`GrpcService`] is ready to
198
    /// accept one more request.
199
0
    pub async fn ready(&mut self) -> Result<(), T::Error>
200
0
    where
201
0
        T: GrpcService<Body>,
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready
202
0
    {
203
0
        future::poll_fn(|cx| self.inner.poll_ready(cx)).await
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready::{closure#0}::{closure#0}
204
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::ready::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::ready::{closure#0}
205
206
    /// Send a single unary gRPC request.
207
0
    pub async fn unary<M1, M2, C>(
208
0
        &mut self,
209
0
        request: Request<M1>,
210
0
        path: PathAndQuery,
211
0
        codec: C,
212
0
    ) -> Result<Response<M2>, Status>
213
0
    where
214
0
        T: GrpcService<Body>,
215
0
        T::ResponseBody: HttpBody + Send + 'static,
216
0
        <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>,
217
0
        C: Codec<Encode = M1, Decode = M2>,
218
0
        M1: Send + Sync + 'static,
219
0
        M2: Send + Sync + 'static,
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _>
220
0
    {
221
0
        let request = request.map(|m| tokio_stream::once(m));
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _>::{closure#0}::{closure#0}
222
0
        self.client_streaming(request, path, codec).await
223
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::unary::<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::unary::<_, _, _>::{closure#0}
224
225
    /// Send a client side streaming gRPC request.
226
0
    pub async fn client_streaming<S, M1, M2, C>(
227
0
        &mut self,
228
0
        request: Request<S>,
229
0
        path: PathAndQuery,
230
0
        codec: C,
231
0
    ) -> Result<Response<M2>, Status>
232
0
    where
233
0
        T: GrpcService<Body>,
234
0
        T::ResponseBody: HttpBody + Send + 'static,
235
0
        <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>,
236
0
        S: Stream<Item = M1> + Send + 'static,
237
0
        C: Codec<Encode = M1, Decode = M2>,
238
0
        M1: Send + Sync + 'static,
239
0
        M2: Send + Sync + 'static,
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>
240
0
    {
241
0
        let (mut parts, body, extensions) =
242
0
            self.streaming(request, path, codec).await?.into_parts();
243
244
0
        let mut body = pin!(body);
245
246
0
        let message = body
247
0
            .try_next()
248
0
            .await
249
0
            .map_err(|mut status| {
250
0
                status.metadata_mut().merge(parts.clone());
251
0
                status
252
0
            })?
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0}::{closure#0}
253
0
            .ok_or_else(|| Status::internal("Missing response message."))?;
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#1}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0}::{closure#1}
254
255
0
        if let Some(trailers) = body.trailers().await? {
256
0
            parts.merge(trailers);
257
0
        }
258
259
0
        Ok(Response::from_parts(parts, message, extensions))
260
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::client_streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::client_streaming::<_, _, _, _>::{closure#0}
261
262
    /// Send a server side streaming gRPC request.
263
0
    pub async fn server_streaming<M1, M2, C>(
264
0
        &mut self,
265
0
        request: Request<M1>,
266
0
        path: PathAndQuery,
267
0
        codec: C,
268
0
    ) -> Result<Response<Streaming<M2>>, Status>
269
0
    where
270
0
        T: GrpcService<Body>,
271
0
        T::ResponseBody: HttpBody + Send + 'static,
272
0
        <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>,
273
0
        C: Codec<Encode = M1, Decode = M2>,
274
0
        M1: Send + Sync + 'static,
275
0
        M2: Send + Sync + 'static,
276
0
    {
277
0
        let request = request.map(|m| tokio_stream::once(m));
278
0
        self.streaming(request, path, codec).await
279
0
    }
280
281
    /// Send a bi-directional streaming gRPC request.
282
0
    pub async fn streaming<S, M1, M2, C>(
283
0
        &mut self,
284
0
        request: Request<S>,
285
0
        path: PathAndQuery,
286
0
        mut codec: C,
287
0
    ) -> Result<Response<Streaming<M2>>, Status>
288
0
    where
289
0
        T: GrpcService<Body>,
290
0
        T::ResponseBody: HttpBody + Send + 'static,
291
0
        <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>,
292
0
        S: Stream<Item = M1> + Send + 'static,
293
0
        C: Codec<Encode = M1, Decode = M2>,
294
0
        M1: Send + Sync + 'static,
295
0
        M2: Send + Sync + 'static,
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _>
296
0
    {
297
0
        let request = request
298
0
            .map(|s| {
299
0
                EncodeBody::new_client(
300
0
                    codec.encoder(),
301
0
                    s.map(Ok),
302
0
                    self.config.send_compression_encodings,
303
0
                    self.config.max_encoding_message_size,
304
                )
305
0
            })
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _>::{closure#0}::{closure#0}
306
0
            .map(Body::new);
307
308
0
        let request = self.config.prepare_request(request, path);
309
310
0
        let response = self
311
0
            .inner
312
0
            .call(request)
313
0
            .await
314
0
            .map_err(Status::from_error_generic)?;
315
316
0
        let decoder = codec.decoder();
317
318
0
        self.create_response(decoder, response)
319
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<async_stream::async_stream::AsyncStream<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, <ztunnel::xds::client::AdsClient>::run_internal::{closure#0}::{closure#3}>, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryRequest, ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::streaming::<tokio_stream::once::Once<ztunnel::xds::types::istio::ca::IstioCertificateRequest>, ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstCodec<ztunnel::xds::types::istio::ca::IstioCertificateRequest, ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::streaming::<_, _, _, _>::{closure#0}
320
321
    // Keeping this code in a separate function from Self::streaming lets functions that return the
322
    // same output share the generated binary code
323
0
    fn create_response<M2>(
324
0
        &self,
325
0
        decoder: impl Decoder<Item = M2, Error = Status> + Send + 'static,
326
0
        response: http::Response<T::ResponseBody>,
327
0
    ) -> Result<Response<Streaming<M2>>, Status>
328
0
    where
329
0
        T: GrpcService<Body>,
330
0
        T::ResponseBody: HttpBody + Send + 'static,
331
0
        <T::ResponseBody as HttpBody>::Error: Into<crate::BoxError>,
332
    {
333
0
        let encoding = CompressionEncoding::from_encoding_header(
334
0
            response.headers(),
335
0
            self.config.accept_compression_encodings,
336
0
        )?;
337
338
0
        let status_code = response.status();
339
0
        let trailers_only_status = Status::from_header_map(response.headers());
340
341
        // We do not need to check for trailers if the `grpc-status` header is present
342
        // with a valid code.
343
0
        let expect_additional_trailers = if let Some(status) = trailers_only_status {
344
0
            if status.code() != Code::Ok {
345
0
                return Err(status);
346
0
            }
347
348
0
            false
349
        } else {
350
0
            true
351
        };
352
353
0
        let response = response.map(|body| {
354
0
            if expect_additional_trailers {
355
0
                Streaming::new_response(
356
0
                    decoder,
357
0
                    body,
358
0
                    status_code,
359
0
                    encoding,
360
0
                    self.config.max_decoding_message_size,
361
                )
362
            } else {
363
0
                Streaming::new_empty(decoder, body)
364
            }
365
0
        });
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>::{closure#0}
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::create_response::<_, _>::{closure#0}
366
367
0
        Ok(Response::from_http(response))
368
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::istio::ca::IstioCertificateResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::istio::ca::IstioCertificateResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel>>::create_response::<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse, tonic::codec::prost::ProstDecoder<ztunnel::xds::types::service::discovery::v3::DeltaDiscoveryResponse>>
Unexecuted instantiation: <tonic::client::grpc::Grpc<_>>::create_response::<_, _>
369
}
370
371
impl GrpcConfig {
372
0
    fn prepare_request(&self, request: Request<Body>, path: PathAndQuery) -> http::Request<Body> {
373
0
        let mut parts = self.origin.clone().into_parts();
374
375
0
        match &parts.path_and_query {
376
0
            Some(pnq) if pnq != "/" => {
377
0
                parts.path_and_query = Some(
378
0
                    format!("{}{}", pnq.path(), path)
379
0
                        .parse()
380
0
                        .expect("must form valid path_and_query"),
381
0
                )
382
            }
383
0
            _ => {
384
0
                parts.path_and_query = Some(path);
385
0
            }
386
        }
387
388
0
        let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");
389
390
0
        let mut request = request.into_http(
391
0
            uri,
392
0
            http::Method::POST,
393
            http::Version::HTTP_2,
394
0
            SanitizeHeaders::Yes,
395
        );
396
397
        // Add the gRPC related HTTP headers
398
0
        request
399
0
            .headers_mut()
400
0
            .insert(TE, HeaderValue::from_static("trailers"));
401
402
        // Set the content type
403
0
        request
404
0
            .headers_mut()
405
0
            .insert(CONTENT_TYPE, GRPC_CONTENT_TYPE);
406
407
        #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
408
        if let Some(encoding) = self.send_compression_encodings {
409
            request.headers_mut().insert(
410
                crate::codec::compression::ENCODING_HEADER,
411
                encoding.into_header_value(),
412
            );
413
        }
414
415
0
        if let Some(header_value) = self
416
0
            .accept_compression_encodings
417
0
            .into_accept_encoding_header_value()
418
0
        {
419
0
            request.headers_mut().insert(
420
0
                crate::codec::compression::ACCEPT_ENCODING_HEADER,
421
0
                header_value,
422
0
            );
423
0
        }
424
425
0
        request
426
0
    }
427
}
428
429
impl<T: Clone> Clone for Grpc<T> {
430
0
    fn clone(&self) -> Self {
431
0
        Self {
432
0
            inner: self.inner.clone(),
433
0
            config: GrpcConfig {
434
0
                origin: self.config.origin.clone(),
435
0
                send_compression_encodings: self.config.send_compression_encodings,
436
0
                accept_compression_encodings: self.config.accept_compression_encodings,
437
0
                max_encoding_message_size: self.config.max_encoding_message_size,
438
0
                max_decoding_message_size: self.config.max_decoding_message_size,
439
0
            },
440
0
        }
441
0
    }
Unexecuted instantiation: <tonic::client::grpc::Grpc<ztunnel::tls::control::TlsGrpcChannel> as core::clone::Clone>::clone
Unexecuted instantiation: <tonic::client::grpc::Grpc<_> as core::clone::Clone>::clone
442
}
443
444
impl<T: fmt::Debug> fmt::Debug for Grpc<T> {
445
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446
0
        f.debug_struct("Grpc")
447
0
            .field("inner", &self.inner)
448
0
            .field("origin", &self.config.origin)
449
0
            .field(
450
0
                "compression_encoding",
451
0
                &self.config.send_compression_encodings,
452
0
            )
453
0
            .field(
454
0
                "accept_compression_encodings",
455
0
                &self.config.accept_compression_encodings,
456
0
            )
457
0
            .field(
458
0
                "max_decoding_message_size",
459
0
                &self.config.max_decoding_message_size,
460
0
            )
461
0
            .field(
462
0
                "max_encoding_message_size",
463
0
                &self.config.max_encoding_message_size,
464
0
            )
465
0
            .finish()
466
0
    }
467
}