1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import dataclasses
17import json # type: ignore
18import logging
19from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
20import warnings
21
22from google.api_core import exceptions as core_exceptions
23from google.api_core import gapic_v1, rest_helpers, rest_streaming
24from google.api_core import retry as retries
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.requests import AuthorizedSession # type: ignore
27import google.protobuf
28from google.protobuf import json_format
29from requests import __version__ as requests_version
30
31from google.cloud.iam_credentials_v1.types import common
32
33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
34from .rest_base import _BaseIAMCredentialsRestTransport
35
36try:
37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
38except AttributeError: # pragma: NO COVER
39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
40
41try:
42 from google.api_core import client_logging # type: ignore
43
44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
45except ImportError: # pragma: NO COVER
46 CLIENT_LOGGING_SUPPORTED = False
47
48_LOGGER = logging.getLogger(__name__)
49
50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
52 grpc_version=None,
53 rest_version=f"requests@{requests_version}",
54)
55
56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
58
59
60class IAMCredentialsRestInterceptor:
61 """Interceptor for IAMCredentials.
62
63 Interceptors are used to manipulate requests, request metadata, and responses
64 in arbitrary ways.
65 Example use cases include:
66 * Logging
67 * Verifying requests according to service or custom semantics
68 * Stripping extraneous information from responses
69
70 These use cases and more can be enabled by injecting an
71 instance of a custom subclass when constructing the IAMCredentialsRestTransport.
72
73 .. code-block:: python
74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor):
75 def pre_generate_access_token(self, request, metadata):
76 logging.log(f"Received request: {request}")
77 return request, metadata
78
79 def post_generate_access_token(self, response):
80 logging.log(f"Received response: {response}")
81 return response
82
83 def pre_generate_id_token(self, request, metadata):
84 logging.log(f"Received request: {request}")
85 return request, metadata
86
87 def post_generate_id_token(self, response):
88 logging.log(f"Received response: {response}")
89 return response
90
91 def pre_sign_blob(self, request, metadata):
92 logging.log(f"Received request: {request}")
93 return request, metadata
94
95 def post_sign_blob(self, response):
96 logging.log(f"Received response: {response}")
97 return response
98
99 def pre_sign_jwt(self, request, metadata):
100 logging.log(f"Received request: {request}")
101 return request, metadata
102
103 def post_sign_jwt(self, response):
104 logging.log(f"Received response: {response}")
105 return response
106
107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor())
108 client = IAMCredentialsClient(transport=transport)
109
110
111 """
112
113 def pre_generate_access_token(
114 self,
115 request: common.GenerateAccessTokenRequest,
116 metadata: Sequence[Tuple[str, Union[str, bytes]]],
117 ) -> Tuple[
118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]
119 ]:
120 """Pre-rpc interceptor for generate_access_token
121
122 Override in a subclass to manipulate the request or metadata
123 before they are sent to the IAMCredentials server.
124 """
125 return request, metadata
126
127 def post_generate_access_token(
128 self, response: common.GenerateAccessTokenResponse
129 ) -> common.GenerateAccessTokenResponse:
130 """Post-rpc interceptor for generate_access_token
131
132 DEPRECATED. Please use the `post_generate_access_token_with_metadata`
133 interceptor instead.
134
135 Override in a subclass to read or manipulate the response
136 after it is returned by the IAMCredentials server but before
137 it is returned to user code. This `post_generate_access_token` interceptor runs
138 before the `post_generate_access_token_with_metadata` interceptor.
139 """
140 return response
141
142 def post_generate_access_token_with_metadata(
143 self,
144 response: common.GenerateAccessTokenResponse,
145 metadata: Sequence[Tuple[str, Union[str, bytes]]],
146 ) -> Tuple[
147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]
148 ]:
149 """Post-rpc interceptor for generate_access_token
150
151 Override in a subclass to read or manipulate the response or metadata after it
152 is returned by the IAMCredentials server but before it is returned to user code.
153
154 We recommend only using this `post_generate_access_token_with_metadata`
155 interceptor in new development instead of the `post_generate_access_token` interceptor.
156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the
157 `post_generate_access_token` interceptor. The (possibly modified) response returned by
158 `post_generate_access_token` will be passed to
159 `post_generate_access_token_with_metadata`.
160 """
161 return response, metadata
162
163 def pre_generate_id_token(
164 self,
165 request: common.GenerateIdTokenRequest,
166 metadata: Sequence[Tuple[str, Union[str, bytes]]],
167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
168 """Pre-rpc interceptor for generate_id_token
169
170 Override in a subclass to manipulate the request or metadata
171 before they are sent to the IAMCredentials server.
172 """
173 return request, metadata
174
175 def post_generate_id_token(
176 self, response: common.GenerateIdTokenResponse
177 ) -> common.GenerateIdTokenResponse:
178 """Post-rpc interceptor for generate_id_token
179
180 DEPRECATED. Please use the `post_generate_id_token_with_metadata`
181 interceptor instead.
182
183 Override in a subclass to read or manipulate the response
184 after it is returned by the IAMCredentials server but before
185 it is returned to user code. This `post_generate_id_token` interceptor runs
186 before the `post_generate_id_token_with_metadata` interceptor.
187 """
188 return response
189
190 def post_generate_id_token_with_metadata(
191 self,
192 response: common.GenerateIdTokenResponse,
193 metadata: Sequence[Tuple[str, Union[str, bytes]]],
194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
195 """Post-rpc interceptor for generate_id_token
196
197 Override in a subclass to read or manipulate the response or metadata after it
198 is returned by the IAMCredentials server but before it is returned to user code.
199
200 We recommend only using this `post_generate_id_token_with_metadata`
201 interceptor in new development instead of the `post_generate_id_token` interceptor.
202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the
203 `post_generate_id_token` interceptor. The (possibly modified) response returned by
204 `post_generate_id_token` will be passed to
205 `post_generate_id_token_with_metadata`.
206 """
207 return response, metadata
208
209 def pre_sign_blob(
210 self,
211 request: common.SignBlobRequest,
212 metadata: Sequence[Tuple[str, Union[str, bytes]]],
213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
214 """Pre-rpc interceptor for sign_blob
215
216 Override in a subclass to manipulate the request or metadata
217 before they are sent to the IAMCredentials server.
218 """
219 return request, metadata
220
221 def post_sign_blob(
222 self, response: common.SignBlobResponse
223 ) -> common.SignBlobResponse:
224 """Post-rpc interceptor for sign_blob
225
226 DEPRECATED. Please use the `post_sign_blob_with_metadata`
227 interceptor instead.
228
229 Override in a subclass to read or manipulate the response
230 after it is returned by the IAMCredentials server but before
231 it is returned to user code. This `post_sign_blob` interceptor runs
232 before the `post_sign_blob_with_metadata` interceptor.
233 """
234 return response
235
236 def post_sign_blob_with_metadata(
237 self,
238 response: common.SignBlobResponse,
239 metadata: Sequence[Tuple[str, Union[str, bytes]]],
240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
241 """Post-rpc interceptor for sign_blob
242
243 Override in a subclass to read or manipulate the response or metadata after it
244 is returned by the IAMCredentials server but before it is returned to user code.
245
246 We recommend only using this `post_sign_blob_with_metadata`
247 interceptor in new development instead of the `post_sign_blob` interceptor.
248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the
249 `post_sign_blob` interceptor. The (possibly modified) response returned by
250 `post_sign_blob` will be passed to
251 `post_sign_blob_with_metadata`.
252 """
253 return response, metadata
254
255 def pre_sign_jwt(
256 self,
257 request: common.SignJwtRequest,
258 metadata: Sequence[Tuple[str, Union[str, bytes]]],
259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]:
260 """Pre-rpc interceptor for sign_jwt
261
262 Override in a subclass to manipulate the request or metadata
263 before they are sent to the IAMCredentials server.
264 """
265 return request, metadata
266
267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse:
268 """Post-rpc interceptor for sign_jwt
269
270 DEPRECATED. Please use the `post_sign_jwt_with_metadata`
271 interceptor instead.
272
273 Override in a subclass to read or manipulate the response
274 after it is returned by the IAMCredentials server but before
275 it is returned to user code. This `post_sign_jwt` interceptor runs
276 before the `post_sign_jwt_with_metadata` interceptor.
277 """
278 return response
279
280 def post_sign_jwt_with_metadata(
281 self,
282 response: common.SignJwtResponse,
283 metadata: Sequence[Tuple[str, Union[str, bytes]]],
284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]:
285 """Post-rpc interceptor for sign_jwt
286
287 Override in a subclass to read or manipulate the response or metadata after it
288 is returned by the IAMCredentials server but before it is returned to user code.
289
290 We recommend only using this `post_sign_jwt_with_metadata`
291 interceptor in new development instead of the `post_sign_jwt` interceptor.
292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the
293 `post_sign_jwt` interceptor. The (possibly modified) response returned by
294 `post_sign_jwt` will be passed to
295 `post_sign_jwt_with_metadata`.
296 """
297 return response, metadata
298
299
300@dataclasses.dataclass
301class IAMCredentialsRestStub:
302 _session: AuthorizedSession
303 _host: str
304 _interceptor: IAMCredentialsRestInterceptor
305
306
307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport):
308 """REST backend synchronous transport for IAMCredentials.
309
310 A service account is a special type of Google account that
311 belongs to your application or a virtual machine (VM), instead
312 of to an individual end user. Your application assumes the
313 identity of the service account to call Google APIs, so that the
314 users aren't directly involved.
315
316 Service account credentials are used to temporarily assume the
317 identity of the service account. Supported credential types
318 include OAuth 2.0 access tokens, OpenID Connect ID tokens,
319 self-signed JSON Web Tokens (JWTs), and more.
320
321 This class defines the same methods as the primary client, so the
322 primary client can load the underlying transport implementation
323 and call it.
324
325 It sends JSON representations of protocol buffers over HTTP/1.1
326 """
327
328 def __init__(
329 self,
330 *,
331 host: str = "iamcredentials.googleapis.com",
332 credentials: Optional[ga_credentials.Credentials] = None,
333 credentials_file: Optional[str] = None,
334 scopes: Optional[Sequence[str]] = None,
335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
336 quota_project_id: Optional[str] = None,
337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
338 always_use_jwt_access: Optional[bool] = False,
339 url_scheme: str = "https",
340 interceptor: Optional[IAMCredentialsRestInterceptor] = None,
341 api_audience: Optional[str] = None,
342 ) -> None:
343 """Instantiate the transport.
344
345 Args:
346 host (Optional[str]):
347 The hostname to connect to (default: 'iamcredentials.googleapis.com').
348 credentials (Optional[google.auth.credentials.Credentials]): The
349 authorization credentials to attach to requests. These
350 credentials identify the application to the service; if none
351 are specified, the client will attempt to ascertain the
352 credentials from the environment.
353
354 credentials_file (Optional[str]): A file with credentials that can
355 be loaded with :func:`google.auth.load_credentials_from_file`.
356 This argument is ignored if ``channel`` is provided.
357 scopes (Optional(Sequence[str])): A list of scopes. This argument is
358 ignored if ``channel`` is provided.
359 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
360 certificate to configure mutual TLS HTTP channel. It is ignored
361 if ``channel`` is provided.
362 quota_project_id (Optional[str]): An optional project to use for billing
363 and quota.
364 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
365 The client info used to send a user-agent string along with
366 API requests. If ``None``, then default info will be used.
367 Generally, you only need to set this if you are developing
368 your own client library.
369 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
370 be used for service account credentials.
371 url_scheme: the protocol scheme for the API endpoint. Normally
372 "https", but for testing or local servers,
373 "http" can be specified.
374 """
375 # Run the base constructor
376 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
377 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
378 # credentials object
379 super().__init__(
380 host=host,
381 credentials=credentials,
382 client_info=client_info,
383 always_use_jwt_access=always_use_jwt_access,
384 url_scheme=url_scheme,
385 api_audience=api_audience,
386 )
387 self._session = AuthorizedSession(
388 self._credentials, default_host=self.DEFAULT_HOST
389 )
390 if client_cert_source_for_mtls:
391 self._session.configure_mtls_channel(client_cert_source_for_mtls)
392 self._interceptor = interceptor or IAMCredentialsRestInterceptor()
393 self._prep_wrapped_messages(client_info)
394
395 class _GenerateAccessToken(
396 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken,
397 IAMCredentialsRestStub,
398 ):
399 def __hash__(self):
400 return hash("IAMCredentialsRestTransport.GenerateAccessToken")
401
402 @staticmethod
403 def _get_response(
404 host,
405 metadata,
406 query_params,
407 session,
408 timeout,
409 transcoded_request,
410 body=None,
411 ):
412 uri = transcoded_request["uri"]
413 method = transcoded_request["method"]
414 headers = dict(metadata)
415 headers["Content-Type"] = "application/json"
416 response = getattr(session, method)(
417 "{host}{uri}".format(host=host, uri=uri),
418 timeout=timeout,
419 headers=headers,
420 params=rest_helpers.flatten_query_params(query_params, strict=True),
421 data=body,
422 )
423 return response
424
425 def __call__(
426 self,
427 request: common.GenerateAccessTokenRequest,
428 *,
429 retry: OptionalRetry = gapic_v1.method.DEFAULT,
430 timeout: Optional[float] = None,
431 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
432 ) -> common.GenerateAccessTokenResponse:
433 r"""Call the generate access token method over HTTP.
434
435 Args:
436 request (~.common.GenerateAccessTokenRequest):
437 The request object.
438 retry (google.api_core.retry.Retry): Designation of what errors, if any,
439 should be retried.
440 timeout (float): The timeout for this request.
441 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
442 sent along with the request as metadata. Normally, each value must be of type `str`,
443 but for metadata keys ending with the suffix `-bin`, the corresponding values must
444 be of type `bytes`.
445
446 Returns:
447 ~.common.GenerateAccessTokenResponse:
448
449 """
450
451 http_options = (
452 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options()
453 )
454
455 request, metadata = self._interceptor.pre_generate_access_token(
456 request, metadata
457 )
458 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request(
459 http_options, request
460 )
461
462 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json(
463 transcoded_request
464 )
465
466 # Jsonify the query params
467 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json(
468 transcoded_request
469 )
470
471 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
472 logging.DEBUG
473 ): # pragma: NO COVER
474 request_url = "{host}{uri}".format(
475 host=self._host, uri=transcoded_request["uri"]
476 )
477 method = transcoded_request["method"]
478 try:
479 request_payload = type(request).to_json(request)
480 except:
481 request_payload = None
482 http_request = {
483 "payload": request_payload,
484 "requestMethod": method,
485 "requestUrl": request_url,
486 "headers": dict(metadata),
487 }
488 _LOGGER.debug(
489 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken",
490 extra={
491 "serviceName": "google.iam.credentials.v1.IAMCredentials",
492 "rpcName": "GenerateAccessToken",
493 "httpRequest": http_request,
494 "metadata": http_request["headers"],
495 },
496 )
497
498 # Send the request
499 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response(
500 self._host,
501 metadata,
502 query_params,
503 self._session,
504 timeout,
505 transcoded_request,
506 body,
507 )
508
509 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
510 # subclass.
511 if response.status_code >= 400:
512 raise core_exceptions.from_http_response(response)
513
514 # Return the response
515 resp = common.GenerateAccessTokenResponse()
516 pb_resp = common.GenerateAccessTokenResponse.pb(resp)
517
518 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
519
520 resp = self._interceptor.post_generate_access_token(resp)
521 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
522 resp, _ = self._interceptor.post_generate_access_token_with_metadata(
523 resp, response_metadata
524 )
525 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
526 logging.DEBUG
527 ): # pragma: NO COVER
528 try:
529 response_payload = common.GenerateAccessTokenResponse.to_json(
530 response
531 )
532 except:
533 response_payload = None
534 http_response = {
535 "payload": response_payload,
536 "headers": dict(response.headers),
537 "status": response.status_code,
538 }
539 _LOGGER.debug(
540 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token",
541 extra={
542 "serviceName": "google.iam.credentials.v1.IAMCredentials",
543 "rpcName": "GenerateAccessToken",
544 "metadata": http_response["headers"],
545 "httpResponse": http_response,
546 },
547 )
548 return resp
549
550 class _GenerateIdToken(
551 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub
552 ):
553 def __hash__(self):
554 return hash("IAMCredentialsRestTransport.GenerateIdToken")
555
556 @staticmethod
557 def _get_response(
558 host,
559 metadata,
560 query_params,
561 session,
562 timeout,
563 transcoded_request,
564 body=None,
565 ):
566 uri = transcoded_request["uri"]
567 method = transcoded_request["method"]
568 headers = dict(metadata)
569 headers["Content-Type"] = "application/json"
570 response = getattr(session, method)(
571 "{host}{uri}".format(host=host, uri=uri),
572 timeout=timeout,
573 headers=headers,
574 params=rest_helpers.flatten_query_params(query_params, strict=True),
575 data=body,
576 )
577 return response
578
579 def __call__(
580 self,
581 request: common.GenerateIdTokenRequest,
582 *,
583 retry: OptionalRetry = gapic_v1.method.DEFAULT,
584 timeout: Optional[float] = None,
585 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
586 ) -> common.GenerateIdTokenResponse:
587 r"""Call the generate id token method over HTTP.
588
589 Args:
590 request (~.common.GenerateIdTokenRequest):
591 The request object.
592 retry (google.api_core.retry.Retry): Designation of what errors, if any,
593 should be retried.
594 timeout (float): The timeout for this request.
595 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
596 sent along with the request as metadata. Normally, each value must be of type `str`,
597 but for metadata keys ending with the suffix `-bin`, the corresponding values must
598 be of type `bytes`.
599
600 Returns:
601 ~.common.GenerateIdTokenResponse:
602
603 """
604
605 http_options = (
606 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options()
607 )
608
609 request, metadata = self._interceptor.pre_generate_id_token(
610 request, metadata
611 )
612 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request(
613 http_options, request
614 )
615
616 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json(
617 transcoded_request
618 )
619
620 # Jsonify the query params
621 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json(
622 transcoded_request
623 )
624
625 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
626 logging.DEBUG
627 ): # pragma: NO COVER
628 request_url = "{host}{uri}".format(
629 host=self._host, uri=transcoded_request["uri"]
630 )
631 method = transcoded_request["method"]
632 try:
633 request_payload = type(request).to_json(request)
634 except:
635 request_payload = None
636 http_request = {
637 "payload": request_payload,
638 "requestMethod": method,
639 "requestUrl": request_url,
640 "headers": dict(metadata),
641 }
642 _LOGGER.debug(
643 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken",
644 extra={
645 "serviceName": "google.iam.credentials.v1.IAMCredentials",
646 "rpcName": "GenerateIdToken",
647 "httpRequest": http_request,
648 "metadata": http_request["headers"],
649 },
650 )
651
652 # Send the request
653 response = IAMCredentialsRestTransport._GenerateIdToken._get_response(
654 self._host,
655 metadata,
656 query_params,
657 self._session,
658 timeout,
659 transcoded_request,
660 body,
661 )
662
663 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
664 # subclass.
665 if response.status_code >= 400:
666 raise core_exceptions.from_http_response(response)
667
668 # Return the response
669 resp = common.GenerateIdTokenResponse()
670 pb_resp = common.GenerateIdTokenResponse.pb(resp)
671
672 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
673
674 resp = self._interceptor.post_generate_id_token(resp)
675 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
676 resp, _ = self._interceptor.post_generate_id_token_with_metadata(
677 resp, response_metadata
678 )
679 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
680 logging.DEBUG
681 ): # pragma: NO COVER
682 try:
683 response_payload = common.GenerateIdTokenResponse.to_json(response)
684 except:
685 response_payload = None
686 http_response = {
687 "payload": response_payload,
688 "headers": dict(response.headers),
689 "status": response.status_code,
690 }
691 _LOGGER.debug(
692 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token",
693 extra={
694 "serviceName": "google.iam.credentials.v1.IAMCredentials",
695 "rpcName": "GenerateIdToken",
696 "metadata": http_response["headers"],
697 "httpResponse": http_response,
698 },
699 )
700 return resp
701
702 class _SignBlob(
703 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub
704 ):
705 def __hash__(self):
706 return hash("IAMCredentialsRestTransport.SignBlob")
707
708 @staticmethod
709 def _get_response(
710 host,
711 metadata,
712 query_params,
713 session,
714 timeout,
715 transcoded_request,
716 body=None,
717 ):
718 uri = transcoded_request["uri"]
719 method = transcoded_request["method"]
720 headers = dict(metadata)
721 headers["Content-Type"] = "application/json"
722 response = getattr(session, method)(
723 "{host}{uri}".format(host=host, uri=uri),
724 timeout=timeout,
725 headers=headers,
726 params=rest_helpers.flatten_query_params(query_params, strict=True),
727 data=body,
728 )
729 return response
730
731 def __call__(
732 self,
733 request: common.SignBlobRequest,
734 *,
735 retry: OptionalRetry = gapic_v1.method.DEFAULT,
736 timeout: Optional[float] = None,
737 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
738 ) -> common.SignBlobResponse:
739 r"""Call the sign blob method over HTTP.
740
741 Args:
742 request (~.common.SignBlobRequest):
743 The request object.
744 retry (google.api_core.retry.Retry): Designation of what errors, if any,
745 should be retried.
746 timeout (float): The timeout for this request.
747 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
748 sent along with the request as metadata. Normally, each value must be of type `str`,
749 but for metadata keys ending with the suffix `-bin`, the corresponding values must
750 be of type `bytes`.
751
752 Returns:
753 ~.common.SignBlobResponse:
754
755 """
756
757 http_options = (
758 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options()
759 )
760
761 request, metadata = self._interceptor.pre_sign_blob(request, metadata)
762 transcoded_request = (
763 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request(
764 http_options, request
765 )
766 )
767
768 body = (
769 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json(
770 transcoded_request
771 )
772 )
773
774 # Jsonify the query params
775 query_params = (
776 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json(
777 transcoded_request
778 )
779 )
780
781 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
782 logging.DEBUG
783 ): # pragma: NO COVER
784 request_url = "{host}{uri}".format(
785 host=self._host, uri=transcoded_request["uri"]
786 )
787 method = transcoded_request["method"]
788 try:
789 request_payload = type(request).to_json(request)
790 except:
791 request_payload = None
792 http_request = {
793 "payload": request_payload,
794 "requestMethod": method,
795 "requestUrl": request_url,
796 "headers": dict(metadata),
797 }
798 _LOGGER.debug(
799 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob",
800 extra={
801 "serviceName": "google.iam.credentials.v1.IAMCredentials",
802 "rpcName": "SignBlob",
803 "httpRequest": http_request,
804 "metadata": http_request["headers"],
805 },
806 )
807
808 # Send the request
809 response = IAMCredentialsRestTransport._SignBlob._get_response(
810 self._host,
811 metadata,
812 query_params,
813 self._session,
814 timeout,
815 transcoded_request,
816 body,
817 )
818
819 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
820 # subclass.
821 if response.status_code >= 400:
822 raise core_exceptions.from_http_response(response)
823
824 # Return the response
825 resp = common.SignBlobResponse()
826 pb_resp = common.SignBlobResponse.pb(resp)
827
828 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
829
830 resp = self._interceptor.post_sign_blob(resp)
831 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
832 resp, _ = self._interceptor.post_sign_blob_with_metadata(
833 resp, response_metadata
834 )
835 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
836 logging.DEBUG
837 ): # pragma: NO COVER
838 try:
839 response_payload = common.SignBlobResponse.to_json(response)
840 except:
841 response_payload = None
842 http_response = {
843 "payload": response_payload,
844 "headers": dict(response.headers),
845 "status": response.status_code,
846 }
847 _LOGGER.debug(
848 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob",
849 extra={
850 "serviceName": "google.iam.credentials.v1.IAMCredentials",
851 "rpcName": "SignBlob",
852 "metadata": http_response["headers"],
853 "httpResponse": http_response,
854 },
855 )
856 return resp
857
858 class _SignJwt(
859 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub
860 ):
861 def __hash__(self):
862 return hash("IAMCredentialsRestTransport.SignJwt")
863
864 @staticmethod
865 def _get_response(
866 host,
867 metadata,
868 query_params,
869 session,
870 timeout,
871 transcoded_request,
872 body=None,
873 ):
874 uri = transcoded_request["uri"]
875 method = transcoded_request["method"]
876 headers = dict(metadata)
877 headers["Content-Type"] = "application/json"
878 response = getattr(session, method)(
879 "{host}{uri}".format(host=host, uri=uri),
880 timeout=timeout,
881 headers=headers,
882 params=rest_helpers.flatten_query_params(query_params, strict=True),
883 data=body,
884 )
885 return response
886
887 def __call__(
888 self,
889 request: common.SignJwtRequest,
890 *,
891 retry: OptionalRetry = gapic_v1.method.DEFAULT,
892 timeout: Optional[float] = None,
893 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
894 ) -> common.SignJwtResponse:
895 r"""Call the sign jwt method over HTTP.
896
897 Args:
898 request (~.common.SignJwtRequest):
899 The request object.
900 retry (google.api_core.retry.Retry): Designation of what errors, if any,
901 should be retried.
902 timeout (float): The timeout for this request.
903 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
904 sent along with the request as metadata. Normally, each value must be of type `str`,
905 but for metadata keys ending with the suffix `-bin`, the corresponding values must
906 be of type `bytes`.
907
908 Returns:
909 ~.common.SignJwtResponse:
910
911 """
912
913 http_options = (
914 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options()
915 )
916
917 request, metadata = self._interceptor.pre_sign_jwt(request, metadata)
918 transcoded_request = (
919 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request(
920 http_options, request
921 )
922 )
923
924 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json(
925 transcoded_request
926 )
927
928 # Jsonify the query params
929 query_params = (
930 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json(
931 transcoded_request
932 )
933 )
934
935 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
936 logging.DEBUG
937 ): # pragma: NO COVER
938 request_url = "{host}{uri}".format(
939 host=self._host, uri=transcoded_request["uri"]
940 )
941 method = transcoded_request["method"]
942 try:
943 request_payload = type(request).to_json(request)
944 except:
945 request_payload = None
946 http_request = {
947 "payload": request_payload,
948 "requestMethod": method,
949 "requestUrl": request_url,
950 "headers": dict(metadata),
951 }
952 _LOGGER.debug(
953 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt",
954 extra={
955 "serviceName": "google.iam.credentials.v1.IAMCredentials",
956 "rpcName": "SignJwt",
957 "httpRequest": http_request,
958 "metadata": http_request["headers"],
959 },
960 )
961
962 # Send the request
963 response = IAMCredentialsRestTransport._SignJwt._get_response(
964 self._host,
965 metadata,
966 query_params,
967 self._session,
968 timeout,
969 transcoded_request,
970 body,
971 )
972
973 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
974 # subclass.
975 if response.status_code >= 400:
976 raise core_exceptions.from_http_response(response)
977
978 # Return the response
979 resp = common.SignJwtResponse()
980 pb_resp = common.SignJwtResponse.pb(resp)
981
982 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
983
984 resp = self._interceptor.post_sign_jwt(resp)
985 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
986 resp, _ = self._interceptor.post_sign_jwt_with_metadata(
987 resp, response_metadata
988 )
989 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
990 logging.DEBUG
991 ): # pragma: NO COVER
992 try:
993 response_payload = common.SignJwtResponse.to_json(response)
994 except:
995 response_payload = None
996 http_response = {
997 "payload": response_payload,
998 "headers": dict(response.headers),
999 "status": response.status_code,
1000 }
1001 _LOGGER.debug(
1002 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt",
1003 extra={
1004 "serviceName": "google.iam.credentials.v1.IAMCredentials",
1005 "rpcName": "SignJwt",
1006 "metadata": http_response["headers"],
1007 "httpResponse": http_response,
1008 },
1009 )
1010 return resp
1011
1012 @property
1013 def generate_access_token(
1014 self,
1015 ) -> Callable[
1016 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse
1017 ]:
1018 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1019 # In C++ this would require a dynamic_cast
1020 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore
1021
1022 @property
1023 def generate_id_token(
1024 self,
1025 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]:
1026 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1027 # In C++ this would require a dynamic_cast
1028 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore
1029
1030 @property
1031 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]:
1032 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1033 # In C++ this would require a dynamic_cast
1034 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore
1035
1036 @property
1037 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]:
1038 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
1039 # In C++ this would require a dynamic_cast
1040 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore
1041
1042 @property
1043 def kind(self) -> str:
1044 return "rest"
1045
1046 def close(self):
1047 self._session.close()
1048
1049
1050__all__ = ("IAMCredentialsRestTransport",)