1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19import warnings
20from typing import Callable, Dict, Optional, Sequence, Tuple, Union
21
22import google.auth # type: ignore
23import google.protobuf.message
24import grpc # type: ignore
25import proto # type: ignore
26from google.api_core import gapic_v1, grpc_helpers
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.grpc import SslCredentials # type: ignore
29from google.protobuf.json_format import MessageToJson
30
31from google.cloud.iam_credentials_v1.types import common
32
33from .base import DEFAULT_CLIENT_INFO, IAMCredentialsTransport
34
35try:
36 from google.api_core import client_logging # type: ignore
37
38 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
39except ImportError: # pragma: NO COVER
40 CLIENT_LOGGING_SUPPORTED = False
41
42_LOGGER = std_logging.getLogger(__name__)
43
44
45class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
46 def intercept_unary_unary(self, continuation, client_call_details, request):
47 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
48 std_logging.DEBUG
49 )
50 if logging_enabled: # pragma: NO COVER
51 request_metadata = client_call_details.metadata
52 if isinstance(request, proto.Message):
53 request_payload = type(request).to_json(request)
54 elif isinstance(request, google.protobuf.message.Message):
55 request_payload = MessageToJson(request)
56 else:
57 request_payload = f"{type(request).__name__}: {pickle.dumps(request)!r}"
58
59 request_metadata = {
60 key: value.decode("utf-8") if isinstance(value, bytes) else value
61 for key, value in request_metadata
62 }
63 grpc_request = {
64 "payload": request_payload,
65 "requestMethod": "grpc",
66 "metadata": dict(request_metadata),
67 }
68 _LOGGER.debug(
69 f"Sending request for {client_call_details.method}",
70 extra={
71 "serviceName": "google.iam.credentials.v1.IAMCredentials",
72 "rpcName": str(client_call_details.method),
73 "request": grpc_request,
74 "metadata": grpc_request["metadata"],
75 },
76 )
77 response = continuation(client_call_details, request)
78 if logging_enabled: # pragma: NO COVER
79 response_metadata = response.trailing_metadata()
80 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
81 metadata = (
82 dict([(k, str(v)) for k, v in response_metadata])
83 if response_metadata
84 else None
85 )
86 result = response.result()
87 if isinstance(result, proto.Message):
88 response_payload = type(result).to_json(result)
89 elif isinstance(result, google.protobuf.message.Message):
90 response_payload = MessageToJson(result)
91 else:
92 response_payload = f"{type(result).__name__}: {pickle.dumps(result)!r}"
93 grpc_response = {
94 "payload": response_payload,
95 "metadata": metadata,
96 "status": "OK",
97 }
98 _LOGGER.debug(
99 f"Received response for {client_call_details.method}.",
100 extra={
101 "serviceName": "google.iam.credentials.v1.IAMCredentials",
102 "rpcName": client_call_details.method,
103 "response": grpc_response,
104 "metadata": grpc_response["metadata"],
105 },
106 )
107 return response
108
109
110class IAMCredentialsGrpcTransport(IAMCredentialsTransport):
111 """gRPC backend transport for IAMCredentials.
112
113 A service account is a special type of Google account that
114 belongs to your application or a virtual machine (VM), instead
115 of to an individual end user. Your application assumes the
116 identity of the service account to call Google APIs, so that the
117 users aren't directly involved.
118
119 Service account credentials are used to temporarily assume the
120 identity of the service account. Supported credential types
121 include OAuth 2.0 access tokens, OpenID Connect ID tokens,
122 self-signed JSON Web Tokens (JWTs), and more.
123
124 This class defines the same methods as the primary client, so the
125 primary client can load the underlying transport implementation
126 and call it.
127
128 It sends protocol buffers over the wire using gRPC (which is built on
129 top of HTTP/2); the ``grpcio`` package must be installed.
130 """
131
132 _stubs: Dict[str, Callable]
133
134 def __init__(
135 self,
136 *,
137 host: str = "iamcredentials.googleapis.com",
138 credentials: Optional[ga_credentials.Credentials] = None,
139 credentials_file: Optional[str] = None,
140 scopes: Optional[Sequence[str]] = None,
141 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
142 api_mtls_endpoint: Optional[str] = None,
143 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
144 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
145 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
146 quota_project_id: Optional[str] = None,
147 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
148 always_use_jwt_access: Optional[bool] = False,
149 api_audience: Optional[str] = None,
150 ) -> None:
151 """Instantiate the transport.
152
153 Args:
154 host (Optional[str]):
155 The hostname to connect to (default: 'iamcredentials.googleapis.com').
156 credentials (Optional[google.auth.credentials.Credentials]): The
157 authorization credentials to attach to requests. These
158 credentials identify the application to the service; if none
159 are specified, the client will attempt to ascertain the
160 credentials from the environment.
161 This argument is ignored if a ``channel`` instance is provided.
162 credentials_file (Optional[str]): Deprecated. A file with credentials that can
163 be loaded with :func:`google.auth.load_credentials_from_file`.
164 This argument is ignored if a ``channel`` instance is provided.
165 This argument will be removed in the next major version of this library.
166 scopes (Optional(Sequence[str])): A list of scopes. This argument is
167 ignored if a ``channel`` instance is provided.
168 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
169 A ``Channel`` instance through which to make calls, or a Callable
170 that constructs and returns one. If set to None, ``self.create_channel``
171 is used to create the channel. If a Callable is given, it will be called
172 with the same arguments as used in ``self.create_channel``.
173 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
174 If provided, it overrides the ``host`` argument and tries to create
175 a mutual TLS channel with client SSL credentials from
176 ``client_cert_source`` or application default SSL credentials.
177 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
178 Deprecated. A callback to provide client SSL certificate bytes and
179 private key bytes, both in PEM format. It is ignored if
180 ``api_mtls_endpoint`` is None.
181 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
182 for the grpc channel. It is ignored if a ``channel`` instance is provided.
183 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
184 A callback to provide client certificate bytes and private key bytes,
185 both in PEM format. It is used to configure a mutual TLS channel. It is
186 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
187 quota_project_id (Optional[str]): An optional project to use for billing
188 and quota.
189 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
190 The client info used to send a user-agent string along with
191 API requests. If ``None``, then default info will be used.
192 Generally, you only need to set this if you're developing
193 your own client library.
194 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
195 be used for service account credentials.
196 api_audience (Optional[str]): The intended audience for the API calls
197 to the service that will be set when using certain 3rd party
198 authentication flows. Audience is typically a resource identifier.
199 If not set, the host value will be used as a default.
200
201 Raises:
202 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
203 creation failed for any reason.
204 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
205 and ``credentials_file`` are passed.
206 """
207 self._grpc_channel = None
208 self._ssl_channel_credentials = ssl_channel_credentials
209 self._stubs: Dict[str, Callable] = {}
210
211 if api_mtls_endpoint:
212 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
213 if client_cert_source:
214 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
215
216 if isinstance(channel, grpc.Channel):
217 # Ignore credentials if a channel was passed.
218 credentials = None
219 self._ignore_credentials = True
220 # If a channel was explicitly provided, set it.
221 self._grpc_channel = channel
222 self._ssl_channel_credentials = None
223
224 else:
225 if api_mtls_endpoint:
226 host = api_mtls_endpoint
227
228 # Create SSL credentials with client_cert_source or application
229 # default SSL credentials.
230 if client_cert_source:
231 cert, key = client_cert_source()
232 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
233 certificate_chain=cert, private_key=key
234 )
235 else:
236 self._ssl_channel_credentials = SslCredentials().ssl_credentials
237
238 else:
239 if client_cert_source_for_mtls and not ssl_channel_credentials:
240 cert, key = client_cert_source_for_mtls()
241 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
242 certificate_chain=cert, private_key=key
243 )
244
245 # The base transport sets the host, credentials and scopes
246 super().__init__(
247 host=host,
248 credentials=credentials,
249 credentials_file=credentials_file,
250 scopes=scopes,
251 quota_project_id=quota_project_id,
252 client_info=client_info,
253 always_use_jwt_access=always_use_jwt_access,
254 api_audience=api_audience,
255 )
256
257 if not self._grpc_channel:
258 # initialize with the provided callable or the default channel
259 channel_init = channel or type(self).create_channel
260 self._grpc_channel = channel_init(
261 self._host,
262 # use the credentials which are saved
263 credentials=self._credentials,
264 # Set ``credentials_file`` to ``None`` here as
265 # the credentials that we saved earlier should be used.
266 credentials_file=None,
267 scopes=self._scopes,
268 ssl_credentials=self._ssl_channel_credentials,
269 quota_project_id=quota_project_id,
270 options=[
271 ("grpc.max_send_message_length", -1),
272 ("grpc.max_receive_message_length", -1),
273 ],
274 )
275
276 self._interceptor = _LoggingClientInterceptor()
277 self._logged_channel = grpc.intercept_channel(
278 self._grpc_channel, self._interceptor
279 )
280
281 # Wrap messages. This must be done after self._logged_channel exists
282 self._prep_wrapped_messages(client_info)
283
284 @classmethod
285 def create_channel(
286 cls,
287 host: str = "iamcredentials.googleapis.com",
288 credentials: Optional[ga_credentials.Credentials] = None,
289 credentials_file: Optional[str] = None,
290 scopes: Optional[Sequence[str]] = None,
291 quota_project_id: Optional[str] = None,
292 **kwargs,
293 ) -> grpc.Channel:
294 """Create and return a gRPC channel object.
295 Args:
296 host (Optional[str]): The host for the channel to use.
297 credentials (Optional[~.Credentials]): The
298 authorization credentials to attach to requests. These
299 credentials identify this application to the service. If
300 none are specified, the client will attempt to ascertain
301 the credentials from the environment.
302 credentials_file (Optional[str]): Deprecated. A file with credentials that can
303 be loaded with :func:`google.auth.load_credentials_from_file`.
304 This argument is mutually exclusive with credentials. This argument will be
305 removed in the next major version of this library.
306 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
307 service. These are only used when credentials are not specified and
308 are passed to :func:`google.auth.default`.
309 quota_project_id (Optional[str]): An optional project to use for billing
310 and quota.
311 kwargs (Optional[dict]): Keyword arguments, which are passed to the
312 channel creation.
313 Returns:
314 grpc.Channel: A gRPC channel object.
315
316 Raises:
317 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
318 and ``credentials_file`` are passed.
319 """
320
321 return grpc_helpers.create_channel(
322 host,
323 credentials=credentials,
324 credentials_file=credentials_file,
325 quota_project_id=quota_project_id,
326 default_scopes=cls.AUTH_SCOPES,
327 scopes=scopes,
328 default_host=cls.DEFAULT_HOST,
329 **kwargs,
330 )
331
332 @property
333 def grpc_channel(self) -> grpc.Channel:
334 """Return the channel designed to connect to this service."""
335 return self._grpc_channel
336
337 @property
338 def generate_access_token(
339 self,
340 ) -> Callable[
341 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse
342 ]:
343 r"""Return a callable for the generate access token method over gRPC.
344
345 Generates an OAuth 2.0 access token for a service
346 account.
347
348 Returns:
349 Callable[[~.GenerateAccessTokenRequest],
350 ~.GenerateAccessTokenResponse]:
351 A function that, when called, will call the underlying RPC
352 on the server.
353 """
354 # Generate a "stub function" on-the-fly which will actually make
355 # the request.
356 # gRPC handles serialization and deserialization, so we just need
357 # to pass in the functions for each.
358 if "generate_access_token" not in self._stubs:
359 self._stubs["generate_access_token"] = self._logged_channel.unary_unary(
360 "/google.iam.credentials.v1.IAMCredentials/GenerateAccessToken",
361 request_serializer=common.GenerateAccessTokenRequest.serialize,
362 response_deserializer=common.GenerateAccessTokenResponse.deserialize,
363 )
364 return self._stubs["generate_access_token"]
365
366 @property
367 def generate_id_token(
368 self,
369 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]:
370 r"""Return a callable for the generate id token method over gRPC.
371
372 Generates an OpenID Connect ID token for a service
373 account.
374
375 Returns:
376 Callable[[~.GenerateIdTokenRequest],
377 ~.GenerateIdTokenResponse]:
378 A function that, when called, will call the underlying RPC
379 on the server.
380 """
381 # Generate a "stub function" on-the-fly which will actually make
382 # the request.
383 # gRPC handles serialization and deserialization, so we just need
384 # to pass in the functions for each.
385 if "generate_id_token" not in self._stubs:
386 self._stubs["generate_id_token"] = self._logged_channel.unary_unary(
387 "/google.iam.credentials.v1.IAMCredentials/GenerateIdToken",
388 request_serializer=common.GenerateIdTokenRequest.serialize,
389 response_deserializer=common.GenerateIdTokenResponse.deserialize,
390 )
391 return self._stubs["generate_id_token"]
392
393 @property
394 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]:
395 r"""Return a callable for the sign blob method over gRPC.
396
397 Signs a blob using a service account's system-managed
398 private key.
399
400 Returns:
401 Callable[[~.SignBlobRequest],
402 ~.SignBlobResponse]:
403 A function that, when called, will call the underlying RPC
404 on the server.
405 """
406 # Generate a "stub function" on-the-fly which will actually make
407 # the request.
408 # gRPC handles serialization and deserialization, so we just need
409 # to pass in the functions for each.
410 if "sign_blob" not in self._stubs:
411 self._stubs["sign_blob"] = self._logged_channel.unary_unary(
412 "/google.iam.credentials.v1.IAMCredentials/SignBlob",
413 request_serializer=common.SignBlobRequest.serialize,
414 response_deserializer=common.SignBlobResponse.deserialize,
415 )
416 return self._stubs["sign_blob"]
417
418 @property
419 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]:
420 r"""Return a callable for the sign jwt method over gRPC.
421
422 Signs a JWT using a service account's system-managed
423 private key.
424
425 Returns:
426 Callable[[~.SignJwtRequest],
427 ~.SignJwtResponse]:
428 A function that, when called, will call the underlying RPC
429 on the server.
430 """
431 # Generate a "stub function" on-the-fly which will actually make
432 # the request.
433 # gRPC handles serialization and deserialization, so we just need
434 # to pass in the functions for each.
435 if "sign_jwt" not in self._stubs:
436 self._stubs["sign_jwt"] = self._logged_channel.unary_unary(
437 "/google.iam.credentials.v1.IAMCredentials/SignJwt",
438 request_serializer=common.SignJwtRequest.serialize,
439 response_deserializer=common.SignJwtResponse.deserialize,
440 )
441 return self._stubs["sign_jwt"]
442
443 def close(self):
444 self._logged_channel.close()
445
446 @property
447 def kind(self) -> str:
448 return "grpc"
449
450
451__all__ = ("IAMCredentialsGrpcTransport",)