1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import dataclasses
17import json # type: ignore
18import logging
19import warnings
20from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
21
22import google.protobuf
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1, rest_helpers, rest_streaming
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.auth.transport.requests import AuthorizedSession # type: ignore
28from google.protobuf import json_format
29from requests import __version__ as requests_version
30
31from google.cloud.iam_credentials_v1.types import common
32
33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
34from .rest_base import _BaseIAMCredentialsRestTransport
35
36try:
37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
38except AttributeError: # pragma: NO COVER
39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
40
41try:
42 from google.api_core import client_logging # type: ignore
43
44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
45except ImportError: # pragma: NO COVER
46 CLIENT_LOGGING_SUPPORTED = False
47
48_LOGGER = logging.getLogger(__name__)
49
50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
52 grpc_version=None,
53 rest_version=f"requests@{requests_version}",
54)
55
56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
58
59
60class IAMCredentialsRestInterceptor:
61 """Interceptor for IAMCredentials.
62
63 Interceptors are used to manipulate requests, request metadata, and responses
64 in arbitrary ways.
65 Example use cases include:
66 * Logging
67 * Verifying requests according to service or custom semantics
68 * Stripping extraneous information from responses
69
70 These use cases and more can be enabled by injecting an
71 instance of a custom subclass when constructing the IAMCredentialsRestTransport.
72
73 .. code-block:: python
74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor):
75 def pre_generate_access_token(self, request, metadata):
76 logging.log(f"Received request: {request}")
77 return request, metadata
78
79 def post_generate_access_token(self, response):
80 logging.log(f"Received response: {response}")
81 return response
82
83 def pre_generate_id_token(self, request, metadata):
84 logging.log(f"Received request: {request}")
85 return request, metadata
86
87 def post_generate_id_token(self, response):
88 logging.log(f"Received response: {response}")
89 return response
90
91 def pre_sign_blob(self, request, metadata):
92 logging.log(f"Received request: {request}")
93 return request, metadata
94
95 def post_sign_blob(self, response):
96 logging.log(f"Received response: {response}")
97 return response
98
99 def pre_sign_jwt(self, request, metadata):
100 logging.log(f"Received request: {request}")
101 return request, metadata
102
103 def post_sign_jwt(self, response):
104 logging.log(f"Received response: {response}")
105 return response
106
107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor())
108 client = IAMCredentialsClient(transport=transport)
109
110
111 """
112
113 def pre_generate_access_token(
114 self,
115 request: common.GenerateAccessTokenRequest,
116 metadata: Sequence[Tuple[str, Union[str, bytes]]],
117 ) -> Tuple[
118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]
119 ]:
120 """Pre-rpc interceptor for generate_access_token
121
122 Override in a subclass to manipulate the request or metadata
123 before they are sent to the IAMCredentials server.
124 """
125 return request, metadata
126
127 def post_generate_access_token(
128 self, response: common.GenerateAccessTokenResponse
129 ) -> common.GenerateAccessTokenResponse:
130 """Post-rpc interceptor for generate_access_token
131
132 DEPRECATED. Please use the `post_generate_access_token_with_metadata`
133 interceptor instead.
134
135 Override in a subclass to read or manipulate the response
136 after it is returned by the IAMCredentials server but before
137 it is returned to user code. This `post_generate_access_token` interceptor runs
138 before the `post_generate_access_token_with_metadata` interceptor.
139 """
140 return response
141
142 def post_generate_access_token_with_metadata(
143 self,
144 response: common.GenerateAccessTokenResponse,
145 metadata: Sequence[Tuple[str, Union[str, bytes]]],
146 ) -> Tuple[
147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]
148 ]:
149 """Post-rpc interceptor for generate_access_token
150
151 Override in a subclass to read or manipulate the response or metadata after it
152 is returned by the IAMCredentials server but before it is returned to user code.
153
154 We recommend only using this `post_generate_access_token_with_metadata`
155 interceptor in new development instead of the `post_generate_access_token` interceptor.
156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the
157 `post_generate_access_token` interceptor. The (possibly modified) response returned by
158 `post_generate_access_token` will be passed to
159 `post_generate_access_token_with_metadata`.
160 """
161 return response, metadata
162
163 def pre_generate_id_token(
164 self,
165 request: common.GenerateIdTokenRequest,
166 metadata: Sequence[Tuple[str, Union[str, bytes]]],
167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
168 """Pre-rpc interceptor for generate_id_token
169
170 Override in a subclass to manipulate the request or metadata
171 before they are sent to the IAMCredentials server.
172 """
173 return request, metadata
174
175 def post_generate_id_token(
176 self, response: common.GenerateIdTokenResponse
177 ) -> common.GenerateIdTokenResponse:
178 """Post-rpc interceptor for generate_id_token
179
180 DEPRECATED. Please use the `post_generate_id_token_with_metadata`
181 interceptor instead.
182
183 Override in a subclass to read or manipulate the response
184 after it is returned by the IAMCredentials server but before
185 it is returned to user code. This `post_generate_id_token` interceptor runs
186 before the `post_generate_id_token_with_metadata` interceptor.
187 """
188 return response
189
190 def post_generate_id_token_with_metadata(
191 self,
192 response: common.GenerateIdTokenResponse,
193 metadata: Sequence[Tuple[str, Union[str, bytes]]],
194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
195 """Post-rpc interceptor for generate_id_token
196
197 Override in a subclass to read or manipulate the response or metadata after it
198 is returned by the IAMCredentials server but before it is returned to user code.
199
200 We recommend only using this `post_generate_id_token_with_metadata`
201 interceptor in new development instead of the `post_generate_id_token` interceptor.
202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the
203 `post_generate_id_token` interceptor. The (possibly modified) response returned by
204 `post_generate_id_token` will be passed to
205 `post_generate_id_token_with_metadata`.
206 """
207 return response, metadata
208
209 def pre_sign_blob(
210 self,
211 request: common.SignBlobRequest,
212 metadata: Sequence[Tuple[str, Union[str, bytes]]],
213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
214 """Pre-rpc interceptor for sign_blob
215
216 Override in a subclass to manipulate the request or metadata
217 before they are sent to the IAMCredentials server.
218 """
219 return request, metadata
220
221 def post_sign_blob(
222 self, response: common.SignBlobResponse
223 ) -> common.SignBlobResponse:
224 """Post-rpc interceptor for sign_blob
225
226 DEPRECATED. Please use the `post_sign_blob_with_metadata`
227 interceptor instead.
228
229 Override in a subclass to read or manipulate the response
230 after it is returned by the IAMCredentials server but before
231 it is returned to user code. This `post_sign_blob` interceptor runs
232 before the `post_sign_blob_with_metadata` interceptor.
233 """
234 return response
235
236 def post_sign_blob_with_metadata(
237 self,
238 response: common.SignBlobResponse,
239 metadata: Sequence[Tuple[str, Union[str, bytes]]],
240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
241 """Post-rpc interceptor for sign_blob
242
243 Override in a subclass to read or manipulate the response or metadata after it
244 is returned by the IAMCredentials server but before it is returned to user code.
245
246 We recommend only using this `post_sign_blob_with_metadata`
247 interceptor in new development instead of the `post_sign_blob` interceptor.
248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the
249 `post_sign_blob` interceptor. The (possibly modified) response returned by
250 `post_sign_blob` will be passed to
251 `post_sign_blob_with_metadata`.
252 """
253 return response, metadata
254
255 def pre_sign_jwt(
256 self,
257 request: common.SignJwtRequest,
258 metadata: Sequence[Tuple[str, Union[str, bytes]]],
259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
260 """Pre-rpc interceptor for sign_jwt
261
262 Override in a subclass to manipulate the request or metadata
263 before they are sent to the IAMCredentials server.
264 """
265 return request, metadata
266
267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse:
268 """Post-rpc interceptor for sign_jwt
269
270 DEPRECATED. Please use the `post_sign_jwt_with_metadata`
271 interceptor instead.
272
273 Override in a subclass to read or manipulate the response
274 after it is returned by the IAMCredentials server but before
275 it is returned to user code. This `post_sign_jwt` interceptor runs
276 before the `post_sign_jwt_with_metadata` interceptor.
277 """
278 return response
279
280 def post_sign_jwt_with_metadata(
281 self,
282 response: common.SignJwtResponse,
283 metadata: Sequence[Tuple[str, Union[str, bytes]]],
284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
285 """Post-rpc interceptor for sign_jwt
286
287 Override in a subclass to read or manipulate the response or metadata after it
288 is returned by the IAMCredentials server but before it is returned to user code.
289
290 We recommend only using this `post_sign_jwt_with_metadata`
291 interceptor in new development instead of the `post_sign_jwt` interceptor.
292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the
293 `post_sign_jwt` interceptor. The (possibly modified) response returned by
294 `post_sign_jwt` will be passed to
295 `post_sign_jwt_with_metadata`.
296 """
297 return response, metadata
298
299
300@dataclasses.dataclass
301class IAMCredentialsRestStub:
302 _session: AuthorizedSession
303 _host: str
304 _interceptor: IAMCredentialsRestInterceptor
305
306
307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport):
308 """REST backend synchronous transport for IAMCredentials.
309
310 A service account is a special type of Google account that
311 belongs to your application or a virtual machine (VM), instead
312 of to an individual end user. Your application assumes the
313 identity of the service account to call Google APIs, so that the
314 users aren't directly involved.
315
316 Service account credentials are used to temporarily assume the
317 identity of the service account. Supported credential types
318 include OAuth 2.0 access tokens, OpenID Connect ID tokens,
319 self-signed JSON Web Tokens (JWTs), and more.
320
321 This class defines the same methods as the primary client, so the
322 primary client can load the underlying transport implementation
323 and call it.
324
325 It sends JSON representations of protocol buffers over HTTP/1.1
326 """
327
328 def __init__(
329 self,
330 *,
331 host: str = "iamcredentials.googleapis.com",
332 credentials: Optional[ga_credentials.Credentials] = None,
333 credentials_file: Optional[str] = None,
334 scopes: Optional[Sequence[str]] = None,
335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
336 quota_project_id: Optional[str] = None,
337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
338 always_use_jwt_access: Optional[bool] = False,
339 url_scheme: str = "https",
340 interceptor: Optional[IAMCredentialsRestInterceptor] = None,
341 api_audience: Optional[str] = None,
342 ) -> None:
343 """Instantiate the transport.
344
345 Args:
346 host (Optional[str]):
347 The hostname to connect to (default: 'iamcredentials.googleapis.com').
348 credentials (Optional[google.auth.credentials.Credentials]): The
349 authorization credentials to attach to requests. These
350 credentials identify the application to the service; if none
351 are specified, the client will attempt to ascertain the
352 credentials from the environment.
353
354 credentials_file (Optional[str]): Deprecated. A file with credentials that can
355 be loaded with :func:`google.auth.load_credentials_from_file`.
356 This argument is ignored if ``channel`` is provided. This argument will be
357 removed in the next major version of this library.
358 scopes (Optional(Sequence[str])): A list of scopes. This argument is
359 ignored if ``channel`` is provided.
360 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
361 certificate to configure mutual TLS HTTP channel. It is ignored
362 if ``channel`` is provided.
363 quota_project_id (Optional[str]): An optional project to use for billing
364 and quota.
365 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
366 The client info used to send a user-agent string along with
367 API requests. If ``None``, then default info will be used.
368 Generally, you only need to set this if you are developing
369 your own client library.
370 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
371 be used for service account credentials.
372 url_scheme: the protocol scheme for the API endpoint. Normally
373 "https", but for testing or local servers,
374 "http" can be specified.
375 interceptor (Optional[IAMCredentialsRestInterceptor]): Interceptor used
376 to manipulate requests, request metadata, and responses.
377 api_audience (Optional[str]): The intended audience for the API calls
378 to the service that will be set when using certain 3rd party
379 authentication flows. Audience is typically a resource identifier.
380 If not set, the host value will be used as a default.
381 """
382 # Run the base constructor
383 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
384 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
385 # credentials object
386 super().__init__(
387 host=host,
388 credentials=credentials,
389 client_info=client_info,
390 always_use_jwt_access=always_use_jwt_access,
391 url_scheme=url_scheme,
392 api_audience=api_audience,
393 )
394 self._session = AuthorizedSession(
395 self._credentials, default_host=self.DEFAULT_HOST
396 )
397 if client_cert_source_for_mtls:
398 self._session.configure_mtls_channel(client_cert_source_for_mtls)
399 self._interceptor = interceptor or IAMCredentialsRestInterceptor()
400 self._prep_wrapped_messages(client_info)
401
402 class _GenerateAccessToken(
403 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken,
404 IAMCredentialsRestStub,
405 ):
406 def __hash__(self):
407 return hash("IAMCredentialsRestTransport.GenerateAccessToken")
408
409 @staticmethod
410 def _get_response(
411 host,
412 metadata,
413 query_params,
414 session,
415 timeout,
416 transcoded_request,
417 body=None,
418 ):
419 uri = transcoded_request["uri"]
420 method = transcoded_request["method"]
421 headers = dict(metadata)
422 headers["Content-Type"] = "application/json"
423 response = getattr(session, method)(
424 "{host}{uri}".format(host=host, uri=uri),
425 timeout=timeout,
426 headers=headers,
427 params=rest_helpers.flatten_query_params(query_params, strict=True),
428 data=body,
429 )
430 return response
431
432 def __call__(
433 self,
434 request: common.GenerateAccessTokenRequest,
435 *,
436 retry: OptionalRetry = gapic_v1.method.DEFAULT,
437 timeout: Optional[float] = None,
438 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
439 ) -> common.GenerateAccessTokenResponse:
440 r"""Call the generate access token method over HTTP.
441
442 Args:
443 request (~.common.GenerateAccessTokenRequest):
444 The request object.
445 retry (google.api_core.retry.Retry): Designation of what errors, if any,
446 should be retried.
447 timeout (float): The timeout for this request.
448 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
449 sent along with the request as metadata. Normally, each value must be of type `str`,
450 but for metadata keys ending with the suffix `-bin`, the corresponding values must
451 be of type `bytes`.
452
453 Returns:
454 ~.common.GenerateAccessTokenResponse:
455
456 """
457
458 http_options = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options()
459
460 request, metadata = self._interceptor.pre_generate_access_token(
461 request, metadata
462 )
463 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request(
464 http_options, request
465 )
466
467 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json(
468 transcoded_request
469 )
470
471 # Jsonify the query params
472 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json(
473 transcoded_request
474 )
475
476 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
477 logging.DEBUG
478 ): # pragma: NO COVER
479 request_url = "{host}{uri}".format(
480 host=self._host, uri=transcoded_request["uri"]
481 )
482 method = transcoded_request["method"]
483 try:
484 request_payload = type(request).to_json(request)
485 except:
486 request_payload = None
487 http_request = {
488 "payload": request_payload,
489 "requestMethod": method,
490 "requestUrl": request_url,
491 "headers": dict(metadata),
492 }
493 _LOGGER.debug(
494 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken",
495 extra={
496 "serviceName": "google.iam.credentials.v1.IAMCredentials",
497 "rpcName": "GenerateAccessToken",
498 "httpRequest": http_request,
499 "metadata": http_request["headers"],
500 },
501 )
502
503 # Send the request
504 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response(
505 self._host,
506 metadata,
507 query_params,
508 self._session,
509 timeout,
510 transcoded_request,
511 body,
512 )
513
514 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
515 # subclass.
516 if response.status_code >= 400:
517 raise core_exceptions.from_http_response(response)
518
519 # Return the response
520 resp = common.GenerateAccessTokenResponse()
521 pb_resp = common.GenerateAccessTokenResponse.pb(resp)
522
523 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
524
525 resp = self._interceptor.post_generate_access_token(resp)
526 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
527 resp, _ = self._interceptor.post_generate_access_token_with_metadata(
528 resp, response_metadata
529 )
530 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
531 logging.DEBUG
532 ): # pragma: NO COVER
533 try:
534 response_payload = common.GenerateAccessTokenResponse.to_json(
535 response
536 )
537 except:
538 response_payload = None
539 http_response = {
540 "payload": response_payload,
541 "headers": dict(response.headers),
542 "status": response.status_code,
543 }
544 _LOGGER.debug(
545 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token",
546 extra={
547 "serviceName": "google.iam.credentials.v1.IAMCredentials",
548 "rpcName": "GenerateAccessToken",
549 "metadata": http_response["headers"],
550 "httpResponse": http_response,
551 },
552 )
553 return resp
554
555 class _GenerateIdToken(
556 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub
557 ):
558 def __hash__(self):
559 return hash("IAMCredentialsRestTransport.GenerateIdToken")
560
561 @staticmethod
562 def _get_response(
563 host,
564 metadata,
565 query_params,
566 session,
567 timeout,
568 transcoded_request,
569 body=None,
570 ):
571 uri = transcoded_request["uri"]
572 method = transcoded_request["method"]
573 headers = dict(metadata)
574 headers["Content-Type"] = "application/json"
575 response = getattr(session, method)(
576 "{host}{uri}".format(host=host, uri=uri),
577 timeout=timeout,
578 headers=headers,
579 params=rest_helpers.flatten_query_params(query_params, strict=True),
580 data=body,
581 )
582 return response
583
584 def __call__(
585 self,
586 request: common.GenerateIdTokenRequest,
587 *,
588 retry: OptionalRetry = gapic_v1.method.DEFAULT,
589 timeout: Optional[float] = None,
590 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
591 ) -> common.GenerateIdTokenResponse:
592 r"""Call the generate id token method over HTTP.
593
594 Args:
595 request (~.common.GenerateIdTokenRequest):
596 The request object.
597 retry (google.api_core.retry.Retry): Designation of what errors, if any,
598 should be retried.
599 timeout (float): The timeout for this request.
600 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
601 sent along with the request as metadata. Normally, each value must be of type `str`,
602 but for metadata keys ending with the suffix `-bin`, the corresponding values must
603 be of type `bytes`.
604
605 Returns:
606 ~.common.GenerateIdTokenResponse:
607
608 """
609
610 http_options = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options()
611
612 request, metadata = self._interceptor.pre_generate_id_token(
613 request, metadata
614 )
615 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request(
616 http_options, request
617 )
618
619 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json(
620 transcoded_request
621 )
622
623 # Jsonify the query params
624 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json(
625 transcoded_request
626 )
627
628 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
629 logging.DEBUG
630 ): # pragma: NO COVER
631 request_url = "{host}{uri}".format(
632 host=self._host, uri=transcoded_request["uri"]
633 )
634 method = transcoded_request["method"]
635 try:
636 request_payload = type(request).to_json(request)
637 except:
638 request_payload = None
639 http_request = {
640 "payload": request_payload,
641 "requestMethod": method,
642 "requestUrl": request_url,
643 "headers": dict(metadata),
644 }
645 _LOGGER.debug(
646 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken",
647 extra={
648 "serviceName": "google.iam.credentials.v1.IAMCredentials",
649 "rpcName": "GenerateIdToken",
650 "httpRequest": http_request,
651 "metadata": http_request["headers"],
652 },
653 )
654
655 # Send the request
656 response = IAMCredentialsRestTransport._GenerateIdToken._get_response(
657 self._host,
658 metadata,
659 query_params,
660 self._session,
661 timeout,
662 transcoded_request,
663 body,
664 )
665
666 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
667 # subclass.
668 if response.status_code >= 400:
669 raise core_exceptions.from_http_response(response)
670
671 # Return the response
672 resp = common.GenerateIdTokenResponse()
673 pb_resp = common.GenerateIdTokenResponse.pb(resp)
674
675 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
676
677 resp = self._interceptor.post_generate_id_token(resp)
678 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
679 resp, _ = self._interceptor.post_generate_id_token_with_metadata(
680 resp, response_metadata
681 )
682 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
683 logging.DEBUG
684 ): # pragma: NO COVER
685 try:
686 response_payload = common.GenerateIdTokenResponse.to_json(response)
687 except:
688 response_payload = None
689 http_response = {
690 "payload": response_payload,
691 "headers": dict(response.headers),
692 "status": response.status_code,
693 }
694 _LOGGER.debug(
695 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token",
696 extra={
697 "serviceName": "google.iam.credentials.v1.IAMCredentials",
698 "rpcName": "GenerateIdToken",
699 "metadata": http_response["headers"],
700 "httpResponse": http_response,
701 },
702 )
703 return resp
704
705 class _SignBlob(
706 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub
707 ):
708 def __hash__(self):
709 return hash("IAMCredentialsRestTransport.SignBlob")
710
711 @staticmethod
712 def _get_response(
713 host,
714 metadata,
715 query_params,
716 session,
717 timeout,
718 transcoded_request,
719 body=None,
720 ):
721 uri = transcoded_request["uri"]
722 method = transcoded_request["method"]
723 headers = dict(metadata)
724 headers["Content-Type"] = "application/json"
725 response = getattr(session, method)(
726 "{host}{uri}".format(host=host, uri=uri),
727 timeout=timeout,
728 headers=headers,
729 params=rest_helpers.flatten_query_params(query_params, strict=True),
730 data=body,
731 )
732 return response
733
734 def __call__(
735 self,
736 request: common.SignBlobRequest,
737 *,
738 retry: OptionalRetry = gapic_v1.method.DEFAULT,
739 timeout: Optional[float] = None,
740 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
741 ) -> common.SignBlobResponse:
742 r"""Call the sign blob method over HTTP.
743
744 Args:
745 request (~.common.SignBlobRequest):
746 The request object.
747 retry (google.api_core.retry.Retry): Designation of what errors, if any,
748 should be retried.
749 timeout (float): The timeout for this request.
750 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
751 sent along with the request as metadata. Normally, each value must be of type `str`,
752 but for metadata keys ending with the suffix `-bin`, the corresponding values must
753 be of type `bytes`.
754
755 Returns:
756 ~.common.SignBlobResponse:
757
758 """
759
760 http_options = (
761 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options()
762 )
763
764 request, metadata = self._interceptor.pre_sign_blob(request, metadata)
765 transcoded_request = (
766 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request(
767 http_options, request
768 )
769 )
770
771 body = (
772 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json(
773 transcoded_request
774 )
775 )
776
777 # Jsonify the query params
778 query_params = (
779 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json(
780 transcoded_request
781 )
782 )
783
784 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
785 logging.DEBUG
786 ): # pragma: NO COVER
787 request_url = "{host}{uri}".format(
788 host=self._host, uri=transcoded_request["uri"]
789 )
790 method = transcoded_request["method"]
791 try:
792 request_payload = type(request).to_json(request)
793 except:
794 request_payload = None
795 http_request = {
796 "payload": request_payload,
797 "requestMethod": method,
798 "requestUrl": request_url,
799 "headers": dict(metadata),
800 }
801 _LOGGER.debug(
802 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob",
803 extra={
804 "serviceName": "google.iam.credentials.v1.IAMCredentials",
805 "rpcName": "SignBlob",
806 "httpRequest": http_request,
807 "metadata": http_request["headers"],
808 },
809 )
810
811 # Send the request
812 response = IAMCredentialsRestTransport._SignBlob._get_response(
813 self._host,
814 metadata,
815 query_params,
816 self._session,
817 timeout,
818 transcoded_request,
819 body,
820 )
821
822 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
823 # subclass.
824 if response.status_code >= 400:
825 raise core_exceptions.from_http_response(response)
826
827 # Return the response
828 resp = common.SignBlobResponse()
829 pb_resp = common.SignBlobResponse.pb(resp)
830
831 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
832
833 resp = self._interceptor.post_sign_blob(resp)
834 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
835 resp, _ = self._interceptor.post_sign_blob_with_metadata(
836 resp, response_metadata
837 )
838 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
839 logging.DEBUG
840 ): # pragma: NO COVER
841 try:
842 response_payload = common.SignBlobResponse.to_json(response)
843 except:
844 response_payload = None
845 http_response = {
846 "payload": response_payload,
847 "headers": dict(response.headers),
848 "status": response.status_code,
849 }
850 _LOGGER.debug(
851 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob",
852 extra={
853 "serviceName": "google.iam.credentials.v1.IAMCredentials",
854 "rpcName": "SignBlob",
855 "metadata": http_response["headers"],
856 "httpResponse": http_response,
857 },
858 )
859 return resp
860
861 class _SignJwt(
862 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub
863 ):
864 def __hash__(self):
865 return hash("IAMCredentialsRestTransport.SignJwt")
866
867 @staticmethod
868 def _get_response(
869 host,
870 metadata,
871 query_params,
872 session,
873 timeout,
874 transcoded_request,
875 body=None,
876 ):
877 uri = transcoded_request["uri"]
878 method = transcoded_request["method"]
879 headers = dict(metadata)
880 headers["Content-Type"] = "application/json"
881 response = getattr(session, method)(
882 "{host}{uri}".format(host=host, uri=uri),
883 timeout=timeout,
884 headers=headers,
885 params=rest_helpers.flatten_query_params(query_params, strict=True),
886 data=body,
887 )
888 return response
889
890 def __call__(
891 self,
892 request: common.SignJwtRequest,
893 *,
894 retry: OptionalRetry = gapic_v1.method.DEFAULT,
895 timeout: Optional[float] = None,
896 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
897 ) -> common.SignJwtResponse:
898 r"""Call the sign jwt method over HTTP.
899
900 Args:
901 request (~.common.SignJwtRequest):
902 The request object.
903 retry (google.api_core.retry.Retry): Designation of what errors, if any,
904 should be retried.
905 timeout (float): The timeout for this request.
906 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
907 sent along with the request as metadata. Normally, each value must be of type `str`,
908 but for metadata keys ending with the suffix `-bin`, the corresponding values must
909 be of type `bytes`.
910
911 Returns:
912 ~.common.SignJwtResponse:
913
914 """
915
916 http_options = (
917 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options()
918 )
919
920 request, metadata = self._interceptor.pre_sign_jwt(request, metadata)
921 transcoded_request = (
922 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request(
923 http_options, request
924 )
925 )
926
927 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json(
928 transcoded_request
929 )
930
931 # Jsonify the query params
932 query_params = (
933 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json(
934 transcoded_request
935 )
936 )
937
938 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
939 logging.DEBUG
940 ): # pragma: NO COVER
941 request_url = "{host}{uri}".format(
942 host=self._host, uri=transcoded_request["uri"]
943 )
944 method = transcoded_request["method"]
945 try:
946 request_payload = type(request).to_json(request)
947 except:
948 request_payload = None
949 http_request = {
950 "payload": request_payload,
951 "requestMethod": method,
952 "requestUrl": request_url,
953 "headers": dict(metadata),
954 }
955 _LOGGER.debug(
956 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt",
957 extra={
958 "serviceName": "google.iam.credentials.v1.IAMCredentials",
959 "rpcName": "SignJwt",
960 "httpRequest": http_request,
961 "metadata": http_request["headers"],
962 },
963 )
964
965 # Send the request
966 response = IAMCredentialsRestTransport._SignJwt._get_response(
967 self._host,
968 metadata,
969 query_params,
970 self._session,
971 timeout,
972 transcoded_request,
973 body,
974 )
975
976 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
977 # subclass.
978 if response.status_code >= 400:
979 raise core_exceptions.from_http_response(response)
980
981 # Return the response
982 resp = common.SignJwtResponse()
983 pb_resp = common.SignJwtResponse.pb(resp)
984
985 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
986
987 resp = self._interceptor.post_sign_jwt(resp)
988 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
989 resp, _ = self._interceptor.post_sign_jwt_with_metadata(
990 resp, response_metadata
991 )
992 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
993 logging.DEBUG
994 ): # pragma: NO COVER
995 try:
996 response_payload = common.SignJwtResponse.to_json(response)
997 except:
998 response_payload = None
999 http_response = {
1000 "payload": response_payload,
1001 "headers": dict(response.headers),
1002 "status": response.status_code,
1003 }
1004 _LOGGER.debug(
1005 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt",
1006 extra={
1007 "serviceName": "google.iam.credentials.v1.IAMCredentials",
1008 "rpcName": "SignJwt",
1009 "metadata": http_response["headers"],
1010 "httpResponse": http_response,
1011 },
1012 )
1013 return resp
1014
1015 @property
1016 def generate_access_token(
1017 self,
1018 ) -> Callable[
1019 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse
1020 ]:
1021 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1022 # In C++ this would require a dynamic_cast
1023 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore
1024
1025 @property
1026 def generate_id_token(
1027 self,
1028 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]:
1029 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1030 # In C++ this would require a dynamic_cast
1031 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore
1032
1033 @property
1034 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]:
1035 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1036 # In C++ this would require a dynamic_cast
1037 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore
1038
1039 @property
1040 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]:
1041 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1042 # In C++ this would require a dynamic_cast
1043 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore
1044
1045 @property
1046 def kind(self) -> str:
1047 return "rest"
1048
1049 def close(self):
1050 self._session.close()
1051
1052
1053__all__ = ("IAMCredentialsRestTransport",)