1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import dataclasses
17import json # type: ignore
18import logging
19from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
20import warnings
21
22from google.api_core import exceptions as core_exceptions
23from google.api_core import gapic_v1, rest_helpers, rest_streaming
24from google.api_core import retry as retries
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.requests import AuthorizedSession # type: ignore
27import google.protobuf
28from google.protobuf import json_format
29from requests import __version__ as requests_version
30
31from google.cloud.iam_credentials_v1.types import common
32
33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
34from .rest_base import _BaseIAMCredentialsRestTransport
35
36try:
37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
38except AttributeError: # pragma: NO COVER
39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
40
41try:
42 from google.api_core import client_logging # type: ignore
43
44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
45except ImportError: # pragma: NO COVER
46 CLIENT_LOGGING_SUPPORTED = False
47
48_LOGGER = logging.getLogger(__name__)
49
50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
52 grpc_version=None,
53 rest_version=f"requests@{requests_version}",
54)
55
56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
58
59
60class IAMCredentialsRestInterceptor:
61 """Interceptor for IAMCredentials.
62
63 Interceptors are used to manipulate requests, request metadata, and responses
64 in arbitrary ways.
65 Example use cases include:
66 * Logging
67 * Verifying requests according to service or custom semantics
68 * Stripping extraneous information from responses
69
70 These use cases and more can be enabled by injecting an
71 instance of a custom subclass when constructing the IAMCredentialsRestTransport.
72
73 .. code-block:: python
74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor):
75 def pre_generate_access_token(self, request, metadata):
76 logging.log(f"Received request: {request}")
77 return request, metadata
78
79 def post_generate_access_token(self, response):
80 logging.log(f"Received response: {response}")
81 return response
82
83 def pre_generate_id_token(self, request, metadata):
84 logging.log(f"Received request: {request}")
85 return request, metadata
86
87 def post_generate_id_token(self, response):
88 logging.log(f"Received response: {response}")
89 return response
90
91 def pre_sign_blob(self, request, metadata):
92 logging.log(f"Received request: {request}")
93 return request, metadata
94
95 def post_sign_blob(self, response):
96 logging.log(f"Received response: {response}")
97 return response
98
99 def pre_sign_jwt(self, request, metadata):
100 logging.log(f"Received request: {request}")
101 return request, metadata
102
103 def post_sign_jwt(self, response):
104 logging.log(f"Received response: {response}")
105 return response
106
107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor())
108 client = IAMCredentialsClient(transport=transport)
109
110
111 """
112
113 def pre_generate_access_token(
114 self,
115 request: common.GenerateAccessTokenRequest,
116 metadata: Sequence[Tuple[str, Union[str, bytes]]],
117 ) -> Tuple[
118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]
119 ]:
120 """Pre-rpc interceptor for generate_access_token
121
122 Override in a subclass to manipulate the request or metadata
123 before they are sent to the IAMCredentials server.
124 """
125 return request, metadata
126
127 def post_generate_access_token(
128 self, response: common.GenerateAccessTokenResponse
129 ) -> common.GenerateAccessTokenResponse:
130 """Post-rpc interceptor for generate_access_token
131
132 DEPRECATED. Please use the `post_generate_access_token_with_metadata`
133 interceptor instead.
134
135 Override in a subclass to read or manipulate the response
136 after it is returned by the IAMCredentials server but before
137 it is returned to user code. This `post_generate_access_token` interceptor runs
138 before the `post_generate_access_token_with_metadata` interceptor.
139 """
140 return response
141
142 def post_generate_access_token_with_metadata(
143 self,
144 response: common.GenerateAccessTokenResponse,
145 metadata: Sequence[Tuple[str, Union[str, bytes]]],
146 ) -> Tuple[
147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]
148 ]:
149 """Post-rpc interceptor for generate_access_token
150
151 Override in a subclass to read or manipulate the response or metadata after it
152 is returned by the IAMCredentials server but before it is returned to user code.
153
154 We recommend only using this `post_generate_access_token_with_metadata`
155 interceptor in new development instead of the `post_generate_access_token` interceptor.
156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the
157 `post_generate_access_token` interceptor. The (possibly modified) response returned by
158 `post_generate_access_token` will be passed to
159 `post_generate_access_token_with_metadata`.
160 """
161 return response, metadata
162
163 def pre_generate_id_token(
164 self,
165 request: common.GenerateIdTokenRequest,
166 metadata: Sequence[Tuple[str, Union[str, bytes]]],
167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
168 """Pre-rpc interceptor for generate_id_token
169
170 Override in a subclass to manipulate the request or metadata
171 before they are sent to the IAMCredentials server.
172 """
173 return request, metadata
174
175 def post_generate_id_token(
176 self, response: common.GenerateIdTokenResponse
177 ) -> common.GenerateIdTokenResponse:
178 """Post-rpc interceptor for generate_id_token
179
180 DEPRECATED. Please use the `post_generate_id_token_with_metadata`
181 interceptor instead.
182
183 Override in a subclass to read or manipulate the response
184 after it is returned by the IAMCredentials server but before
185 it is returned to user code. This `post_generate_id_token` interceptor runs
186 before the `post_generate_id_token_with_metadata` interceptor.
187 """
188 return response
189
190 def post_generate_id_token_with_metadata(
191 self,
192 response: common.GenerateIdTokenResponse,
193 metadata: Sequence[Tuple[str, Union[str, bytes]]],
194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
195 """Post-rpc interceptor for generate_id_token
196
197 Override in a subclass to read or manipulate the response or metadata after it
198 is returned by the IAMCredentials server but before it is returned to user code.
199
200 We recommend only using this `post_generate_id_token_with_metadata`
201 interceptor in new development instead of the `post_generate_id_token` interceptor.
202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the
203 `post_generate_id_token` interceptor. The (possibly modified) response returned by
204 `post_generate_id_token` will be passed to
205 `post_generate_id_token_with_metadata`.
206 """
207 return response, metadata
208
209 def pre_sign_blob(
210 self,
211 request: common.SignBlobRequest,
212 metadata: Sequence[Tuple[str, Union[str, bytes]]],
213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
214 """Pre-rpc interceptor for sign_blob
215
216 Override in a subclass to manipulate the request or metadata
217 before they are sent to the IAMCredentials server.
218 """
219 return request, metadata
220
221 def post_sign_blob(
222 self, response: common.SignBlobResponse
223 ) -> common.SignBlobResponse:
224 """Post-rpc interceptor for sign_blob
225
226 DEPRECATED. Please use the `post_sign_blob_with_metadata`
227 interceptor instead.
228
229 Override in a subclass to read or manipulate the response
230 after it is returned by the IAMCredentials server but before
231 it is returned to user code. This `post_sign_blob` interceptor runs
232 before the `post_sign_blob_with_metadata` interceptor.
233 """
234 return response
235
236 def post_sign_blob_with_metadata(
237 self,
238 response: common.SignBlobResponse,
239 metadata: Sequence[Tuple[str, Union[str, bytes]]],
240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
241 """Post-rpc interceptor for sign_blob
242
243 Override in a subclass to read or manipulate the response or metadata after it
244 is returned by the IAMCredentials server but before it is returned to user code.
245
246 We recommend only using this `post_sign_blob_with_metadata`
247 interceptor in new development instead of the `post_sign_blob` interceptor.
248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the
249 `post_sign_blob` interceptor. The (possibly modified) response returned by
250 `post_sign_blob` will be passed to
251 `post_sign_blob_with_metadata`.
252 """
253 return response, metadata
254
255 def pre_sign_jwt(
256 self,
257 request: common.SignJwtRequest,
258 metadata: Sequence[Tuple[str, Union[str, bytes]]],
259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
260 """Pre-rpc interceptor for sign_jwt
261
262 Override in a subclass to manipulate the request or metadata
263 before they are sent to the IAMCredentials server.
264 """
265 return request, metadata
266
267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse:
268 """Post-rpc interceptor for sign_jwt
269
270 DEPRECATED. Please use the `post_sign_jwt_with_metadata`
271 interceptor instead.
272
273 Override in a subclass to read or manipulate the response
274 after it is returned by the IAMCredentials server but before
275 it is returned to user code. This `post_sign_jwt` interceptor runs
276 before the `post_sign_jwt_with_metadata` interceptor.
277 """
278 return response
279
280 def post_sign_jwt_with_metadata(
281 self,
282 response: common.SignJwtResponse,
283 metadata: Sequence[Tuple[str, Union[str, bytes]]],
284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
285 """Post-rpc interceptor for sign_jwt
286
287 Override in a subclass to read or manipulate the response or metadata after it
288 is returned by the IAMCredentials server but before it is returned to user code.
289
290 We recommend only using this `post_sign_jwt_with_metadata`
291 interceptor in new development instead of the `post_sign_jwt` interceptor.
292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the
293 `post_sign_jwt` interceptor. The (possibly modified) response returned by
294 `post_sign_jwt` will be passed to
295 `post_sign_jwt_with_metadata`.
296 """
297 return response, metadata
298
299
300@dataclasses.dataclass
301class IAMCredentialsRestStub:
302 _session: AuthorizedSession
303 _host: str
304 _interceptor: IAMCredentialsRestInterceptor
305
306
307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport):
308 """REST backend synchronous transport for IAMCredentials.
309
310 A service account is a special type of Google account that
311 belongs to your application or a virtual machine (VM), instead
312 of to an individual end user. Your application assumes the
313 identity of the service account to call Google APIs, so that the
314 users aren't directly involved.
315
316 Service account credentials are used to temporarily assume the
317 identity of the service account. Supported credential types
318 include OAuth 2.0 access tokens, OpenID Connect ID tokens,
319 self-signed JSON Web Tokens (JWTs), and more.
320
321 This class defines the same methods as the primary client, so the
322 primary client can load the underlying transport implementation
323 and call it.
324
325 It sends JSON representations of protocol buffers over HTTP/1.1
326 """
327
328 def __init__(
329 self,
330 *,
331 host: str = "iamcredentials.googleapis.com",
332 credentials: Optional[ga_credentials.Credentials] = None,
333 credentials_file: Optional[str] = None,
334 scopes: Optional[Sequence[str]] = None,
335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
336 quota_project_id: Optional[str] = None,
337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
338 always_use_jwt_access: Optional[bool] = False,
339 url_scheme: str = "https",
340 interceptor: Optional[IAMCredentialsRestInterceptor] = None,
341 api_audience: Optional[str] = None,
342 ) -> None:
343 """Instantiate the transport.
344
345 Args:
346 host (Optional[str]):
347 The hostname to connect to (default: 'iamcredentials.googleapis.com').
348 credentials (Optional[google.auth.credentials.Credentials]): The
349 authorization credentials to attach to requests. These
350 credentials identify the application to the service; if none
351 are specified, the client will attempt to ascertain the
352 credentials from the environment.
353
354 credentials_file (Optional[str]): Deprecated. A file with credentials that can
355 be loaded with :func:`google.auth.load_credentials_from_file`.
356 This argument is ignored if ``channel`` is provided. This argument will be
357 removed in the next major version of this library.
358 scopes (Optional(Sequence[str])): A list of scopes. This argument is
359 ignored if ``channel`` is provided.
360 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
361 certificate to configure mutual TLS HTTP channel. It is ignored
362 if ``channel`` is provided.
363 quota_project_id (Optional[str]): An optional project to use for billing
364 and quota.
365 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
366 The client info used to send a user-agent string along with
367 API requests. If ``None``, then default info will be used.
368 Generally, you only need to set this if you are developing
369 your own client library.
370 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
371 be used for service account credentials.
372 url_scheme: the protocol scheme for the API endpoint. Normally
373 "https", but for testing or local servers,
374 "http" can be specified.
375 """
376 # Run the base constructor
377 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
378 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
379 # credentials object
380 super().__init__(
381 host=host,
382 credentials=credentials,
383 client_info=client_info,
384 always_use_jwt_access=always_use_jwt_access,
385 url_scheme=url_scheme,
386 api_audience=api_audience,
387 )
388 self._session = AuthorizedSession(
389 self._credentials, default_host=self.DEFAULT_HOST
390 )
391 if client_cert_source_for_mtls:
392 self._session.configure_mtls_channel(client_cert_source_for_mtls)
393 self._interceptor = interceptor or IAMCredentialsRestInterceptor()
394 self._prep_wrapped_messages(client_info)
395
396 class _GenerateAccessToken(
397 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken,
398 IAMCredentialsRestStub,
399 ):
400 def __hash__(self):
401 return hash("IAMCredentialsRestTransport.GenerateAccessToken")
402
403 @staticmethod
404 def _get_response(
405 host,
406 metadata,
407 query_params,
408 session,
409 timeout,
410 transcoded_request,
411 body=None,
412 ):
413 uri = transcoded_request["uri"]
414 method = transcoded_request["method"]
415 headers = dict(metadata)
416 headers["Content-Type"] = "application/json"
417 response = getattr(session, method)(
418 "{host}{uri}".format(host=host, uri=uri),
419 timeout=timeout,
420 headers=headers,
421 params=rest_helpers.flatten_query_params(query_params, strict=True),
422 data=body,
423 )
424 return response
425
426 def __call__(
427 self,
428 request: common.GenerateAccessTokenRequest,
429 *,
430 retry: OptionalRetry = gapic_v1.method.DEFAULT,
431 timeout: Optional[float] = None,
432 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
433 ) -> common.GenerateAccessTokenResponse:
434 r"""Call the generate access token method over HTTP.
435
436 Args:
437 request (~.common.GenerateAccessTokenRequest):
438 The request object.
439 retry (google.api_core.retry.Retry): Designation of what errors, if any,
440 should be retried.
441 timeout (float): The timeout for this request.
442 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
443 sent along with the request as metadata. Normally, each value must be of type `str`,
444 but for metadata keys ending with the suffix `-bin`, the corresponding values must
445 be of type `bytes`.
446
447 Returns:
448 ~.common.GenerateAccessTokenResponse:
449
450 """
451
452 http_options = (
453 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options()
454 )
455
456 request, metadata = self._interceptor.pre_generate_access_token(
457 request, metadata
458 )
459 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request(
460 http_options, request
461 )
462
463 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json(
464 transcoded_request
465 )
466
467 # Jsonify the query params
468 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json(
469 transcoded_request
470 )
471
472 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
473 logging.DEBUG
474 ): # pragma: NO COVER
475 request_url = "{host}{uri}".format(
476 host=self._host, uri=transcoded_request["uri"]
477 )
478 method = transcoded_request["method"]
479 try:
480 request_payload = type(request).to_json(request)
481 except:
482 request_payload = None
483 http_request = {
484 "payload": request_payload,
485 "requestMethod": method,
486 "requestUrl": request_url,
487 "headers": dict(metadata),
488 }
489 _LOGGER.debug(
490 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken",
491 extra={
492 "serviceName": "google.iam.credentials.v1.IAMCredentials",
493 "rpcName": "GenerateAccessToken",
494 "httpRequest": http_request,
495 "metadata": http_request["headers"],
496 },
497 )
498
499 # Send the request
500 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response(
501 self._host,
502 metadata,
503 query_params,
504 self._session,
505 timeout,
506 transcoded_request,
507 body,
508 )
509
510 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
511 # subclass.
512 if response.status_code >= 400:
513 raise core_exceptions.from_http_response(response)
514
515 # Return the response
516 resp = common.GenerateAccessTokenResponse()
517 pb_resp = common.GenerateAccessTokenResponse.pb(resp)
518
519 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
520
521 resp = self._interceptor.post_generate_access_token(resp)
522 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
523 resp, _ = self._interceptor.post_generate_access_token_with_metadata(
524 resp, response_metadata
525 )
526 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
527 logging.DEBUG
528 ): # pragma: NO COVER
529 try:
530 response_payload = common.GenerateAccessTokenResponse.to_json(
531 response
532 )
533 except:
534 response_payload = None
535 http_response = {
536 "payload": response_payload,
537 "headers": dict(response.headers),
538 "status": response.status_code,
539 }
540 _LOGGER.debug(
541 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token",
542 extra={
543 "serviceName": "google.iam.credentials.v1.IAMCredentials",
544 "rpcName": "GenerateAccessToken",
545 "metadata": http_response["headers"],
546 "httpResponse": http_response,
547 },
548 )
549 return resp
550
551 class _GenerateIdToken(
552 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub
553 ):
554 def __hash__(self):
555 return hash("IAMCredentialsRestTransport.GenerateIdToken")
556
557 @staticmethod
558 def _get_response(
559 host,
560 metadata,
561 query_params,
562 session,
563 timeout,
564 transcoded_request,
565 body=None,
566 ):
567 uri = transcoded_request["uri"]
568 method = transcoded_request["method"]
569 headers = dict(metadata)
570 headers["Content-Type"] = "application/json"
571 response = getattr(session, method)(
572 "{host}{uri}".format(host=host, uri=uri),
573 timeout=timeout,
574 headers=headers,
575 params=rest_helpers.flatten_query_params(query_params, strict=True),
576 data=body,
577 )
578 return response
579
580 def __call__(
581 self,
582 request: common.GenerateIdTokenRequest,
583 *,
584 retry: OptionalRetry = gapic_v1.method.DEFAULT,
585 timeout: Optional[float] = None,
586 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
587 ) -> common.GenerateIdTokenResponse:
588 r"""Call the generate id token method over HTTP.
589
590 Args:
591 request (~.common.GenerateIdTokenRequest):
592 The request object.
593 retry (google.api_core.retry.Retry): Designation of what errors, if any,
594 should be retried.
595 timeout (float): The timeout for this request.
596 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
597 sent along with the request as metadata. Normally, each value must be of type `str`,
598 but for metadata keys ending with the suffix `-bin`, the corresponding values must
599 be of type `bytes`.
600
601 Returns:
602 ~.common.GenerateIdTokenResponse:
603
604 """
605
606 http_options = (
607 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options()
608 )
609
610 request, metadata = self._interceptor.pre_generate_id_token(
611 request, metadata
612 )
613 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request(
614 http_options, request
615 )
616
617 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json(
618 transcoded_request
619 )
620
621 # Jsonify the query params
622 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json(
623 transcoded_request
624 )
625
626 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
627 logging.DEBUG
628 ): # pragma: NO COVER
629 request_url = "{host}{uri}".format(
630 host=self._host, uri=transcoded_request["uri"]
631 )
632 method = transcoded_request["method"]
633 try:
634 request_payload = type(request).to_json(request)
635 except:
636 request_payload = None
637 http_request = {
638 "payload": request_payload,
639 "requestMethod": method,
640 "requestUrl": request_url,
641 "headers": dict(metadata),
642 }
643 _LOGGER.debug(
644 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken",
645 extra={
646 "serviceName": "google.iam.credentials.v1.IAMCredentials",
647 "rpcName": "GenerateIdToken",
648 "httpRequest": http_request,
649 "metadata": http_request["headers"],
650 },
651 )
652
653 # Send the request
654 response = IAMCredentialsRestTransport._GenerateIdToken._get_response(
655 self._host,
656 metadata,
657 query_params,
658 self._session,
659 timeout,
660 transcoded_request,
661 body,
662 )
663
664 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
665 # subclass.
666 if response.status_code >= 400:
667 raise core_exceptions.from_http_response(response)
668
669 # Return the response
670 resp = common.GenerateIdTokenResponse()
671 pb_resp = common.GenerateIdTokenResponse.pb(resp)
672
673 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
674
675 resp = self._interceptor.post_generate_id_token(resp)
676 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
677 resp, _ = self._interceptor.post_generate_id_token_with_metadata(
678 resp, response_metadata
679 )
680 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
681 logging.DEBUG
682 ): # pragma: NO COVER
683 try:
684 response_payload = common.GenerateIdTokenResponse.to_json(response)
685 except:
686 response_payload = None
687 http_response = {
688 "payload": response_payload,
689 "headers": dict(response.headers),
690 "status": response.status_code,
691 }
692 _LOGGER.debug(
693 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token",
694 extra={
695 "serviceName": "google.iam.credentials.v1.IAMCredentials",
696 "rpcName": "GenerateIdToken",
697 "metadata": http_response["headers"],
698 "httpResponse": http_response,
699 },
700 )
701 return resp
702
703 class _SignBlob(
704 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub
705 ):
706 def __hash__(self):
707 return hash("IAMCredentialsRestTransport.SignBlob")
708
709 @staticmethod
710 def _get_response(
711 host,
712 metadata,
713 query_params,
714 session,
715 timeout,
716 transcoded_request,
717 body=None,
718 ):
719 uri = transcoded_request["uri"]
720 method = transcoded_request["method"]
721 headers = dict(metadata)
722 headers["Content-Type"] = "application/json"
723 response = getattr(session, method)(
724 "{host}{uri}".format(host=host, uri=uri),
725 timeout=timeout,
726 headers=headers,
727 params=rest_helpers.flatten_query_params(query_params, strict=True),
728 data=body,
729 )
730 return response
731
732 def __call__(
733 self,
734 request: common.SignBlobRequest,
735 *,
736 retry: OptionalRetry = gapic_v1.method.DEFAULT,
737 timeout: Optional[float] = None,
738 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
739 ) -> common.SignBlobResponse:
740 r"""Call the sign blob method over HTTP.
741
742 Args:
743 request (~.common.SignBlobRequest):
744 The request object.
745 retry (google.api_core.retry.Retry): Designation of what errors, if any,
746 should be retried.
747 timeout (float): The timeout for this request.
748 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
749 sent along with the request as metadata. Normally, each value must be of type `str`,
750 but for metadata keys ending with the suffix `-bin`, the corresponding values must
751 be of type `bytes`.
752
753 Returns:
754 ~.common.SignBlobResponse:
755
756 """
757
758 http_options = (
759 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options()
760 )
761
762 request, metadata = self._interceptor.pre_sign_blob(request, metadata)
763 transcoded_request = (
764 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request(
765 http_options, request
766 )
767 )
768
769 body = (
770 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json(
771 transcoded_request
772 )
773 )
774
775 # Jsonify the query params
776 query_params = (
777 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json(
778 transcoded_request
779 )
780 )
781
782 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
783 logging.DEBUG
784 ): # pragma: NO COVER
785 request_url = "{host}{uri}".format(
786 host=self._host, uri=transcoded_request["uri"]
787 )
788 method = transcoded_request["method"]
789 try:
790 request_payload = type(request).to_json(request)
791 except:
792 request_payload = None
793 http_request = {
794 "payload": request_payload,
795 "requestMethod": method,
796 "requestUrl": request_url,
797 "headers": dict(metadata),
798 }
799 _LOGGER.debug(
800 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob",
801 extra={
802 "serviceName": "google.iam.credentials.v1.IAMCredentials",
803 "rpcName": "SignBlob",
804 "httpRequest": http_request,
805 "metadata": http_request["headers"],
806 },
807 )
808
809 # Send the request
810 response = IAMCredentialsRestTransport._SignBlob._get_response(
811 self._host,
812 metadata,
813 query_params,
814 self._session,
815 timeout,
816 transcoded_request,
817 body,
818 )
819
820 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
821 # subclass.
822 if response.status_code >= 400:
823 raise core_exceptions.from_http_response(response)
824
825 # Return the response
826 resp = common.SignBlobResponse()
827 pb_resp = common.SignBlobResponse.pb(resp)
828
829 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
830
831 resp = self._interceptor.post_sign_blob(resp)
832 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
833 resp, _ = self._interceptor.post_sign_blob_with_metadata(
834 resp, response_metadata
835 )
836 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
837 logging.DEBUG
838 ): # pragma: NO COVER
839 try:
840 response_payload = common.SignBlobResponse.to_json(response)
841 except:
842 response_payload = None
843 http_response = {
844 "payload": response_payload,
845 "headers": dict(response.headers),
846 "status": response.status_code,
847 }
848 _LOGGER.debug(
849 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob",
850 extra={
851 "serviceName": "google.iam.credentials.v1.IAMCredentials",
852 "rpcName": "SignBlob",
853 "metadata": http_response["headers"],
854 "httpResponse": http_response,
855 },
856 )
857 return resp
858
859 class _SignJwt(
860 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub
861 ):
862 def __hash__(self):
863 return hash("IAMCredentialsRestTransport.SignJwt")
864
865 @staticmethod
866 def _get_response(
867 host,
868 metadata,
869 query_params,
870 session,
871 timeout,
872 transcoded_request,
873 body=None,
874 ):
875 uri = transcoded_request["uri"]
876 method = transcoded_request["method"]
877 headers = dict(metadata)
878 headers["Content-Type"] = "application/json"
879 response = getattr(session, method)(
880 "{host}{uri}".format(host=host, uri=uri),
881 timeout=timeout,
882 headers=headers,
883 params=rest_helpers.flatten_query_params(query_params, strict=True),
884 data=body,
885 )
886 return response
887
888 def __call__(
889 self,
890 request: common.SignJwtRequest,
891 *,
892 retry: OptionalRetry = gapic_v1.method.DEFAULT,
893 timeout: Optional[float] = None,
894 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
895 ) -> common.SignJwtResponse:
896 r"""Call the sign jwt method over HTTP.
897
898 Args:
899 request (~.common.SignJwtRequest):
900 The request object.
901 retry (google.api_core.retry.Retry): Designation of what errors, if any,
902 should be retried.
903 timeout (float): The timeout for this request.
904 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
905 sent along with the request as metadata. Normally, each value must be of type `str`,
906 but for metadata keys ending with the suffix `-bin`, the corresponding values must
907 be of type `bytes`.
908
909 Returns:
910 ~.common.SignJwtResponse:
911
912 """
913
914 http_options = (
915 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options()
916 )
917
918 request, metadata = self._interceptor.pre_sign_jwt(request, metadata)
919 transcoded_request = (
920 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request(
921 http_options, request
922 )
923 )
924
925 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json(
926 transcoded_request
927 )
928
929 # Jsonify the query params
930 query_params = (
931 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json(
932 transcoded_request
933 )
934 )
935
936 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
937 logging.DEBUG
938 ): # pragma: NO COVER
939 request_url = "{host}{uri}".format(
940 host=self._host, uri=transcoded_request["uri"]
941 )
942 method = transcoded_request["method"]
943 try:
944 request_payload = type(request).to_json(request)
945 except:
946 request_payload = None
947 http_request = {
948 "payload": request_payload,
949 "requestMethod": method,
950 "requestUrl": request_url,
951 "headers": dict(metadata),
952 }
953 _LOGGER.debug(
954 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt",
955 extra={
956 "serviceName": "google.iam.credentials.v1.IAMCredentials",
957 "rpcName": "SignJwt",
958 "httpRequest": http_request,
959 "metadata": http_request["headers"],
960 },
961 )
962
963 # Send the request
964 response = IAMCredentialsRestTransport._SignJwt._get_response(
965 self._host,
966 metadata,
967 query_params,
968 self._session,
969 timeout,
970 transcoded_request,
971 body,
972 )
973
974 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
975 # subclass.
976 if response.status_code >= 400:
977 raise core_exceptions.from_http_response(response)
978
979 # Return the response
980 resp = common.SignJwtResponse()
981 pb_resp = common.SignJwtResponse.pb(resp)
982
983 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
984
985 resp = self._interceptor.post_sign_jwt(resp)
986 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
987 resp, _ = self._interceptor.post_sign_jwt_with_metadata(
988 resp, response_metadata
989 )
990 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
991 logging.DEBUG
992 ): # pragma: NO COVER
993 try:
994 response_payload = common.SignJwtResponse.to_json(response)
995 except:
996 response_payload = None
997 http_response = {
998 "payload": response_payload,
999 "headers": dict(response.headers),
1000 "status": response.status_code,
1001 }
1002 _LOGGER.debug(
1003 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt",
1004 extra={
1005 "serviceName": "google.iam.credentials.v1.IAMCredentials",
1006 "rpcName": "SignJwt",
1007 "metadata": http_response["headers"],
1008 "httpResponse": http_response,
1009 },
1010 )
1011 return resp
1012
1013 @property
1014 def generate_access_token(
1015 self,
1016 ) -> Callable[
1017 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse
1018 ]:
1019 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1020 # In C++ this would require a dynamic_cast
1021 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore
1022
1023 @property
1024 def generate_id_token(
1025 self,
1026 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]:
1027 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1028 # In C++ this would require a dynamic_cast
1029 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore
1030
1031 @property
1032 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]:
1033 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1034 # In C++ this would require a dynamic_cast
1035 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore
1036
1037 @property
1038 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]:
1039 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1040 # In C++ this would require a dynamic_cast
1041 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore
1042
1043 @property
1044 def kind(self) -> str:
1045 return "rest"
1046
1047 def close(self):
1048 self._session.close()
1049
1050
1051__all__ = ("IAMCredentialsRestTransport",)