1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17from http import HTTPStatus
18import json
19import logging as std_logging
20import os
21import re
22from typing import (
23 Callable,
24 Dict,
25 Mapping,
26 MutableMapping,
27 MutableSequence,
28 Optional,
29 Sequence,
30 Tuple,
31 Type,
32 Union,
33 cast,
34)
35import warnings
36
37from google.api_core import client_options as client_options_lib
38from google.api_core import exceptions as core_exceptions
39from google.api_core import gapic_v1
40from google.api_core import retry as retries
41from google.auth import credentials as ga_credentials # type: ignore
42from google.auth.exceptions import MutualTLSChannelError # type: ignore
43from google.auth.transport import mtls # type: ignore
44from google.auth.transport.grpc import SslCredentials # type: ignore
45from google.oauth2 import service_account # type: ignore
46import google.protobuf
47
48from google.cloud.iam_credentials_v1 import gapic_version as package_version
49
50try:
51 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
52except AttributeError: # pragma: NO COVER
53 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
54
55try:
56 from google.api_core import client_logging # type: ignore
57
58 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
59except ImportError: # pragma: NO COVER
60 CLIENT_LOGGING_SUPPORTED = False
61
62_LOGGER = std_logging.getLogger(__name__)
63
64from google.protobuf import duration_pb2 # type: ignore
65from google.protobuf import timestamp_pb2 # type: ignore
66
67from google.cloud.iam_credentials_v1.types import common
68
69from .transports.base import DEFAULT_CLIENT_INFO, IAMCredentialsTransport
70from .transports.grpc import IAMCredentialsGrpcTransport
71from .transports.grpc_asyncio import IAMCredentialsGrpcAsyncIOTransport
72from .transports.rest import IAMCredentialsRestTransport
73
74
75class IAMCredentialsClientMeta(type):
76 """Metaclass for the IAMCredentials client.
77
78 This provides class-level methods for building and retrieving
79 support objects (e.g. transport) without polluting the client instance
80 objects.
81 """
82
83 _transport_registry = (
84 OrderedDict()
85 ) # type: Dict[str, Type[IAMCredentialsTransport]]
86 _transport_registry["grpc"] = IAMCredentialsGrpcTransport
87 _transport_registry["grpc_asyncio"] = IAMCredentialsGrpcAsyncIOTransport
88 _transport_registry["rest"] = IAMCredentialsRestTransport
89
90 def get_transport_class(
91 cls,
92 label: Optional[str] = None,
93 ) -> Type[IAMCredentialsTransport]:
94 """Returns an appropriate transport class.
95
96 Args:
97 label: The name of the desired transport. If none is
98 provided, then the first transport in the registry is used.
99
100 Returns:
101 The transport class to use.
102 """
103 # If a specific transport is requested, return that one.
104 if label:
105 return cls._transport_registry[label]
106
107 # No transport is requested; return the default (that is, the first one
108 # in the dictionary).
109 return next(iter(cls._transport_registry.values()))
110
111
112class IAMCredentialsClient(metaclass=IAMCredentialsClientMeta):
113 """A service account is a special type of Google account that
114 belongs to your application or a virtual machine (VM), instead
115 of to an individual end user. Your application assumes the
116 identity of the service account to call Google APIs, so that the
117 users aren't directly involved.
118
119 Service account credentials are used to temporarily assume the
120 identity of the service account. Supported credential types
121 include OAuth 2.0 access tokens, OpenID Connect ID tokens,
122 self-signed JSON Web Tokens (JWTs), and more.
123 """
124
125 @staticmethod
126 def _get_default_mtls_endpoint(api_endpoint):
127 """Converts api endpoint to mTLS endpoint.
128
129 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
130 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
131 Args:
132 api_endpoint (Optional[str]): the api endpoint to convert.
133 Returns:
134 str: converted mTLS api endpoint.
135 """
136 if not api_endpoint:
137 return api_endpoint
138
139 mtls_endpoint_re = re.compile(
140 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
141 )
142
143 m = mtls_endpoint_re.match(api_endpoint)
144 name, mtls, sandbox, googledomain = m.groups()
145 if mtls or not googledomain:
146 return api_endpoint
147
148 if sandbox:
149 return api_endpoint.replace(
150 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
151 )
152
153 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
154
155 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
156 DEFAULT_ENDPOINT = "iamcredentials.googleapis.com"
157 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
158 DEFAULT_ENDPOINT
159 )
160
161 _DEFAULT_ENDPOINT_TEMPLATE = "iamcredentials.{UNIVERSE_DOMAIN}"
162 _DEFAULT_UNIVERSE = "googleapis.com"
163
164 @classmethod
165 def from_service_account_info(cls, info: dict, *args, **kwargs):
166 """Creates an instance of this client using the provided credentials
167 info.
168
169 Args:
170 info (dict): The service account private key info.
171 args: Additional arguments to pass to the constructor.
172 kwargs: Additional arguments to pass to the constructor.
173
174 Returns:
175 IAMCredentialsClient: The constructed client.
176 """
177 credentials = service_account.Credentials.from_service_account_info(info)
178 kwargs["credentials"] = credentials
179 return cls(*args, **kwargs)
180
181 @classmethod
182 def from_service_account_file(cls, filename: str, *args, **kwargs):
183 """Creates an instance of this client using the provided credentials
184 file.
185
186 Args:
187 filename (str): The path to the service account private key json
188 file.
189 args: Additional arguments to pass to the constructor.
190 kwargs: Additional arguments to pass to the constructor.
191
192 Returns:
193 IAMCredentialsClient: The constructed client.
194 """
195 credentials = service_account.Credentials.from_service_account_file(filename)
196 kwargs["credentials"] = credentials
197 return cls(*args, **kwargs)
198
199 from_service_account_json = from_service_account_file
200
201 @property
202 def transport(self) -> IAMCredentialsTransport:
203 """Returns the transport used by the client instance.
204
205 Returns:
206 IAMCredentialsTransport: The transport used by the client
207 instance.
208 """
209 return self._transport
210
211 @staticmethod
212 def service_account_path(
213 project: str,
214 service_account: str,
215 ) -> str:
216 """Returns a fully-qualified service_account string."""
217 return "projects/{project}/serviceAccounts/{service_account}".format(
218 project=project,
219 service_account=service_account,
220 )
221
222 @staticmethod
223 def parse_service_account_path(path: str) -> Dict[str, str]:
224 """Parses a service_account path into its component segments."""
225 m = re.match(
226 r"^projects/(?P<project>.+?)/serviceAccounts/(?P<service_account>.+?)$",
227 path,
228 )
229 return m.groupdict() if m else {}
230
231 @staticmethod
232 def common_billing_account_path(
233 billing_account: str,
234 ) -> str:
235 """Returns a fully-qualified billing_account string."""
236 return "billingAccounts/{billing_account}".format(
237 billing_account=billing_account,
238 )
239
240 @staticmethod
241 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
242 """Parse a billing_account path into its component segments."""
243 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
244 return m.groupdict() if m else {}
245
246 @staticmethod
247 def common_folder_path(
248 folder: str,
249 ) -> str:
250 """Returns a fully-qualified folder string."""
251 return "folders/{folder}".format(
252 folder=folder,
253 )
254
255 @staticmethod
256 def parse_common_folder_path(path: str) -> Dict[str, str]:
257 """Parse a folder path into its component segments."""
258 m = re.match(r"^folders/(?P<folder>.+?)$", path)
259 return m.groupdict() if m else {}
260
261 @staticmethod
262 def common_organization_path(
263 organization: str,
264 ) -> str:
265 """Returns a fully-qualified organization string."""
266 return "organizations/{organization}".format(
267 organization=organization,
268 )
269
270 @staticmethod
271 def parse_common_organization_path(path: str) -> Dict[str, str]:
272 """Parse a organization path into its component segments."""
273 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
274 return m.groupdict() if m else {}
275
276 @staticmethod
277 def common_project_path(
278 project: str,
279 ) -> str:
280 """Returns a fully-qualified project string."""
281 return "projects/{project}".format(
282 project=project,
283 )
284
285 @staticmethod
286 def parse_common_project_path(path: str) -> Dict[str, str]:
287 """Parse a project path into its component segments."""
288 m = re.match(r"^projects/(?P<project>.+?)$", path)
289 return m.groupdict() if m else {}
290
291 @staticmethod
292 def common_location_path(
293 project: str,
294 location: str,
295 ) -> str:
296 """Returns a fully-qualified location string."""
297 return "projects/{project}/locations/{location}".format(
298 project=project,
299 location=location,
300 )
301
302 @staticmethod
303 def parse_common_location_path(path: str) -> Dict[str, str]:
304 """Parse a location path into its component segments."""
305 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
306 return m.groupdict() if m else {}
307
308 @classmethod
309 def get_mtls_endpoint_and_cert_source(
310 cls, client_options: Optional[client_options_lib.ClientOptions] = None
311 ):
312 """Deprecated. Return the API endpoint and client cert source for mutual TLS.
313
314 The client cert source is determined in the following order:
315 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
316 client cert source is None.
317 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
318 default client cert source exists, use the default one; otherwise the client cert
319 source is None.
320
321 The API endpoint is determined in the following order:
322 (1) if `client_options.api_endpoint` if provided, use the provided one.
323 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
324 default mTLS endpoint; if the environment variable is "never", use the default API
325 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
326 use the default API endpoint.
327
328 More details can be found at https://google.aip.dev/auth/4114.
329
330 Args:
331 client_options (google.api_core.client_options.ClientOptions): Custom options for the
332 client. Only the `api_endpoint` and `client_cert_source` properties may be used
333 in this method.
334
335 Returns:
336 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
337 client cert source to use.
338
339 Raises:
340 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
341 """
342
343 warnings.warn(
344 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.",
345 DeprecationWarning,
346 )
347 if client_options is None:
348 client_options = client_options_lib.ClientOptions()
349 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
350 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
351 if use_client_cert not in ("true", "false"):
352 raise ValueError(
353 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
354 )
355 if use_mtls_endpoint not in ("auto", "never", "always"):
356 raise MutualTLSChannelError(
357 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
358 )
359
360 # Figure out the client cert source to use.
361 client_cert_source = None
362 if use_client_cert == "true":
363 if client_options.client_cert_source:
364 client_cert_source = client_options.client_cert_source
365 elif mtls.has_default_client_cert_source():
366 client_cert_source = mtls.default_client_cert_source()
367
368 # Figure out which api endpoint to use.
369 if client_options.api_endpoint is not None:
370 api_endpoint = client_options.api_endpoint
371 elif use_mtls_endpoint == "always" or (
372 use_mtls_endpoint == "auto" and client_cert_source
373 ):
374 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
375 else:
376 api_endpoint = cls.DEFAULT_ENDPOINT
377
378 return api_endpoint, client_cert_source
379
380 @staticmethod
381 def _read_environment_variables():
382 """Returns the environment variables used by the client.
383
384 Returns:
385 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE,
386 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables.
387
388 Raises:
389 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not
390 any of ["true", "false"].
391 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT
392 is not any of ["auto", "never", "always"].
393 """
394 use_client_cert = os.getenv(
395 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
396 ).lower()
397 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower()
398 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN")
399 if use_client_cert not in ("true", "false"):
400 raise ValueError(
401 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
402 )
403 if use_mtls_endpoint not in ("auto", "never", "always"):
404 raise MutualTLSChannelError(
405 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
406 )
407 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env
408
409 @staticmethod
410 def _get_client_cert_source(provided_cert_source, use_cert_flag):
411 """Return the client cert source to be used by the client.
412
413 Args:
414 provided_cert_source (bytes): The client certificate source provided.
415 use_cert_flag (bool): A flag indicating whether to use the client certificate.
416
417 Returns:
418 bytes or None: The client cert source to be used by the client.
419 """
420 client_cert_source = None
421 if use_cert_flag:
422 if provided_cert_source:
423 client_cert_source = provided_cert_source
424 elif mtls.has_default_client_cert_source():
425 client_cert_source = mtls.default_client_cert_source()
426 return client_cert_source
427
428 @staticmethod
429 def _get_api_endpoint(
430 api_override, client_cert_source, universe_domain, use_mtls_endpoint
431 ):
432 """Return the API endpoint used by the client.
433
434 Args:
435 api_override (str): The API endpoint override. If specified, this is always
436 the return value of this function and the other arguments are not used.
437 client_cert_source (bytes): The client certificate source used by the client.
438 universe_domain (str): The universe domain used by the client.
439 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters.
440 Possible values are "always", "auto", or "never".
441
442 Returns:
443 str: The API endpoint to be used by the client.
444 """
445 if api_override is not None:
446 api_endpoint = api_override
447 elif use_mtls_endpoint == "always" or (
448 use_mtls_endpoint == "auto" and client_cert_source
449 ):
450 _default_universe = IAMCredentialsClient._DEFAULT_UNIVERSE
451 if universe_domain != _default_universe:
452 raise MutualTLSChannelError(
453 f"mTLS is not supported in any universe other than {_default_universe}."
454 )
455 api_endpoint = IAMCredentialsClient.DEFAULT_MTLS_ENDPOINT
456 else:
457 api_endpoint = IAMCredentialsClient._DEFAULT_ENDPOINT_TEMPLATE.format(
458 UNIVERSE_DOMAIN=universe_domain
459 )
460 return api_endpoint
461
462 @staticmethod
463 def _get_universe_domain(
464 client_universe_domain: Optional[str], universe_domain_env: Optional[str]
465 ) -> str:
466 """Return the universe domain used by the client.
467
468 Args:
469 client_universe_domain (Optional[str]): The universe domain configured via the client options.
470 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable.
471
472 Returns:
473 str: The universe domain to be used by the client.
474
475 Raises:
476 ValueError: If the universe domain is an empty string.
477 """
478 universe_domain = IAMCredentialsClient._DEFAULT_UNIVERSE
479 if client_universe_domain is not None:
480 universe_domain = client_universe_domain
481 elif universe_domain_env is not None:
482 universe_domain = universe_domain_env
483 if len(universe_domain.strip()) == 0:
484 raise ValueError("Universe Domain cannot be an empty string.")
485 return universe_domain
486
487 def _validate_universe_domain(self):
488 """Validates client's and credentials' universe domains are consistent.
489
490 Returns:
491 bool: True iff the configured universe domain is valid.
492
493 Raises:
494 ValueError: If the configured universe domain is not valid.
495 """
496
497 # NOTE (b/349488459): universe validation is disabled until further notice.
498 return True
499
500 def _add_cred_info_for_auth_errors(
501 self, error: core_exceptions.GoogleAPICallError
502 ) -> None:
503 """Adds credential info string to error details for 401/403/404 errors.
504
505 Args:
506 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info.
507 """
508 if error.code not in [
509 HTTPStatus.UNAUTHORIZED,
510 HTTPStatus.FORBIDDEN,
511 HTTPStatus.NOT_FOUND,
512 ]:
513 return
514
515 cred = self._transport._credentials
516
517 # get_cred_info is only available in google-auth>=2.35.0
518 if not hasattr(cred, "get_cred_info"):
519 return
520
521 # ignore the type check since pypy test fails when get_cred_info
522 # is not available
523 cred_info = cred.get_cred_info() # type: ignore
524 if cred_info and hasattr(error._details, "append"):
525 error._details.append(json.dumps(cred_info))
526
527 @property
528 def api_endpoint(self):
529 """Return the API endpoint used by the client instance.
530
531 Returns:
532 str: The API endpoint used by the client instance.
533 """
534 return self._api_endpoint
535
536 @property
537 def universe_domain(self) -> str:
538 """Return the universe domain used by the client instance.
539
540 Returns:
541 str: The universe domain used by the client instance.
542 """
543 return self._universe_domain
544
545 def __init__(
546 self,
547 *,
548 credentials: Optional[ga_credentials.Credentials] = None,
549 transport: Optional[
550 Union[str, IAMCredentialsTransport, Callable[..., IAMCredentialsTransport]]
551 ] = None,
552 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
553 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
554 ) -> None:
555 """Instantiates the iam credentials client.
556
557 Args:
558 credentials (Optional[google.auth.credentials.Credentials]): The
559 authorization credentials to attach to requests. These
560 credentials identify the application to the service; if none
561 are specified, the client will attempt to ascertain the
562 credentials from the environment.
563 transport (Optional[Union[str,IAMCredentialsTransport,Callable[..., IAMCredentialsTransport]]]):
564 The transport to use, or a Callable that constructs and returns a new transport.
565 If a Callable is given, it will be called with the same set of initialization
566 arguments as used in the IAMCredentialsTransport constructor.
567 If set to None, a transport is chosen automatically.
568 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
569 Custom options for the client.
570
571 1. The ``api_endpoint`` property can be used to override the
572 default endpoint provided by the client when ``transport`` is
573 not explicitly provided. Only if this property is not set and
574 ``transport`` was not explicitly provided, the endpoint is
575 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
576 variable, which have one of the following values:
577 "always" (always use the default mTLS endpoint), "never" (always
578 use the default regular endpoint) and "auto" (auto-switch to the
579 default mTLS endpoint if client certificate is present; this is
580 the default value).
581
582 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
583 is "true", then the ``client_cert_source`` property can be used
584 to provide a client certificate for mTLS transport. If
585 not provided, the default SSL client certificate will be used if
586 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
587 set, no client certificate will be used.
588
589 3. The ``universe_domain`` property can be used to override the
590 default "googleapis.com" universe. Note that the ``api_endpoint``
591 property still takes precedence; and ``universe_domain`` is
592 currently not supported for mTLS.
593
594 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
595 The client info used to send a user-agent string along with
596 API requests. If ``None``, then default info will be used.
597 Generally, you only need to set this if you're developing
598 your own client library.
599
600 Raises:
601 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
602 creation failed for any reason.
603 """
604 self._client_options = client_options
605 if isinstance(self._client_options, dict):
606 self._client_options = client_options_lib.from_dict(self._client_options)
607 if self._client_options is None:
608 self._client_options = client_options_lib.ClientOptions()
609 self._client_options = cast(
610 client_options_lib.ClientOptions, self._client_options
611 )
612
613 universe_domain_opt = getattr(self._client_options, "universe_domain", None)
614
615 (
616 self._use_client_cert,
617 self._use_mtls_endpoint,
618 self._universe_domain_env,
619 ) = IAMCredentialsClient._read_environment_variables()
620 self._client_cert_source = IAMCredentialsClient._get_client_cert_source(
621 self._client_options.client_cert_source, self._use_client_cert
622 )
623 self._universe_domain = IAMCredentialsClient._get_universe_domain(
624 universe_domain_opt, self._universe_domain_env
625 )
626 self._api_endpoint = None # updated below, depending on `transport`
627
628 # Initialize the universe domain validation.
629 self._is_universe_domain_valid = False
630
631 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER
632 # Setup logging.
633 client_logging.initialize_logging()
634
635 api_key_value = getattr(self._client_options, "api_key", None)
636 if api_key_value and credentials:
637 raise ValueError(
638 "client_options.api_key and credentials are mutually exclusive"
639 )
640
641 # Save or instantiate the transport.
642 # Ordinarily, we provide the transport, but allowing a custom transport
643 # instance provides an extensibility point for unusual situations.
644 transport_provided = isinstance(transport, IAMCredentialsTransport)
645 if transport_provided:
646 # transport is a IAMCredentialsTransport instance.
647 if credentials or self._client_options.credentials_file or api_key_value:
648 raise ValueError(
649 "When providing a transport instance, "
650 "provide its credentials directly."
651 )
652 if self._client_options.scopes:
653 raise ValueError(
654 "When providing a transport instance, provide its scopes "
655 "directly."
656 )
657 self._transport = cast(IAMCredentialsTransport, transport)
658 self._api_endpoint = self._transport.host
659
660 self._api_endpoint = (
661 self._api_endpoint
662 or IAMCredentialsClient._get_api_endpoint(
663 self._client_options.api_endpoint,
664 self._client_cert_source,
665 self._universe_domain,
666 self._use_mtls_endpoint,
667 )
668 )
669
670 if not transport_provided:
671 import google.auth._default # type: ignore
672
673 if api_key_value and hasattr(
674 google.auth._default, "get_api_key_credentials"
675 ):
676 credentials = google.auth._default.get_api_key_credentials(
677 api_key_value
678 )
679
680 transport_init: Union[
681 Type[IAMCredentialsTransport], Callable[..., IAMCredentialsTransport]
682 ] = (
683 IAMCredentialsClient.get_transport_class(transport)
684 if isinstance(transport, str) or transport is None
685 else cast(Callable[..., IAMCredentialsTransport], transport)
686 )
687 # initialize with the provided callable or the passed in class
688 self._transport = transport_init(
689 credentials=credentials,
690 credentials_file=self._client_options.credentials_file,
691 host=self._api_endpoint,
692 scopes=self._client_options.scopes,
693 client_cert_source_for_mtls=self._client_cert_source,
694 quota_project_id=self._client_options.quota_project_id,
695 client_info=client_info,
696 always_use_jwt_access=True,
697 api_audience=self._client_options.api_audience,
698 )
699
700 if "async" not in str(self._transport):
701 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
702 std_logging.DEBUG
703 ): # pragma: NO COVER
704 _LOGGER.debug(
705 "Created client `google.iam.credentials_v1.IAMCredentialsClient`.",
706 extra={
707 "serviceName": "google.iam.credentials.v1.IAMCredentials",
708 "universeDomain": getattr(
709 self._transport._credentials, "universe_domain", ""
710 ),
711 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}",
712 "credentialsInfo": getattr(
713 self.transport._credentials, "get_cred_info", lambda: None
714 )(),
715 }
716 if hasattr(self._transport, "_credentials")
717 else {
718 "serviceName": "google.iam.credentials.v1.IAMCredentials",
719 "credentialsType": None,
720 },
721 )
722
723 def generate_access_token(
724 self,
725 request: Optional[Union[common.GenerateAccessTokenRequest, dict]] = None,
726 *,
727 name: Optional[str] = None,
728 delegates: Optional[MutableSequence[str]] = None,
729 scope: Optional[MutableSequence[str]] = None,
730 lifetime: Optional[duration_pb2.Duration] = None,
731 retry: OptionalRetry = gapic_v1.method.DEFAULT,
732 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
733 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
734 ) -> common.GenerateAccessTokenResponse:
735 r"""Generates an OAuth 2.0 access token for a service
736 account.
737
738 .. code-block:: python
739
740 # This snippet has been automatically generated and should be regarded as a
741 # code template only.
742 # It will require modifications to work:
743 # - It may require correct/in-range values for request initialization.
744 # - It may require specifying regional endpoints when creating the service
745 # client as shown in:
746 # https://googleapis.dev/python/google-api-core/latest/client_options.html
747 from google.cloud import iam_credentials_v1
748
749 def sample_generate_access_token():
750 # Create a client
751 client = iam_credentials_v1.IAMCredentialsClient()
752
753 # Initialize request argument(s)
754 request = iam_credentials_v1.GenerateAccessTokenRequest(
755 name="name_value",
756 scope=['scope_value1', 'scope_value2'],
757 )
758
759 # Make the request
760 response = client.generate_access_token(request=request)
761
762 # Handle the response
763 print(response)
764
765 Args:
766 request (Union[google.cloud.iam_credentials_v1.types.GenerateAccessTokenRequest, dict]):
767 The request object.
768 name (str):
769 Required. The resource name of the service account for
770 which the credentials are requested, in the following
771 format:
772 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
773 The ``-`` wildcard character is required; replacing it
774 with a project ID is invalid.
775
776 This corresponds to the ``name`` field
777 on the ``request`` instance; if ``request`` is provided, this
778 should not be set.
779 delegates (MutableSequence[str]):
780 The sequence of service accounts in a delegation chain.
781 Each service account must be granted the
782 ``roles/iam.serviceAccountTokenCreator`` role on its
783 next service account in the chain. The last service
784 account in the chain must be granted the
785 ``roles/iam.serviceAccountTokenCreator`` role on the
786 service account that is specified in the ``name`` field
787 of the request.
788
789 The delegates must have the following format:
790 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
791 The ``-`` wildcard character is required; replacing it
792 with a project ID is invalid.
793
794 This corresponds to the ``delegates`` field
795 on the ``request`` instance; if ``request`` is provided, this
796 should not be set.
797 scope (MutableSequence[str]):
798 Required. Code to identify the scopes
799 to be included in the OAuth 2.0 access
800 token. See
801 https://developers.google.com/identity/protocols/googlescopes
802 for more information.
803 At least one value required.
804
805 This corresponds to the ``scope`` field
806 on the ``request`` instance; if ``request`` is provided, this
807 should not be set.
808 lifetime (google.protobuf.duration_pb2.Duration):
809 The desired lifetime duration of the
810 access token in seconds. Must be set to
811 a value less than or equal to 3600 (1
812 hour). If a value is not specified, the
813 token's lifetime will be set to a
814 default value of one hour.
815
816 This corresponds to the ``lifetime`` field
817 on the ``request`` instance; if ``request`` is provided, this
818 should not be set.
819 retry (google.api_core.retry.Retry): Designation of what errors, if any,
820 should be retried.
821 timeout (float): The timeout for this request.
822 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
823 sent along with the request as metadata. Normally, each value must be of type `str`,
824 but for metadata keys ending with the suffix `-bin`, the corresponding values must
825 be of type `bytes`.
826
827 Returns:
828 google.cloud.iam_credentials_v1.types.GenerateAccessTokenResponse:
829
830 """
831 # Create or coerce a protobuf request object.
832 # - Quick check: If we got a request object, we should *not* have
833 # gotten any keyword arguments that map to the request.
834 flattened_params = [name, delegates, scope, lifetime]
835 has_flattened_params = (
836 len([param for param in flattened_params if param is not None]) > 0
837 )
838 if request is not None and has_flattened_params:
839 raise ValueError(
840 "If the `request` argument is set, then none of "
841 "the individual field arguments should be set."
842 )
843
844 # - Use the request object if provided (there's no risk of modifying the input as
845 # there are no flattened fields), or create one.
846 if not isinstance(request, common.GenerateAccessTokenRequest):
847 request = common.GenerateAccessTokenRequest(request)
848 # If we have keyword arguments corresponding to fields on the
849 # request, apply these.
850 if name is not None:
851 request.name = name
852 if delegates is not None:
853 request.delegates = delegates
854 if scope is not None:
855 request.scope = scope
856 if lifetime is not None:
857 request.lifetime = lifetime
858
859 # Wrap the RPC method; this adds retry and timeout information,
860 # and friendly error handling.
861 rpc = self._transport._wrapped_methods[self._transport.generate_access_token]
862
863 # Certain fields should be provided within the metadata header;
864 # add these here.
865 metadata = tuple(metadata) + (
866 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
867 )
868
869 # Validate the universe domain.
870 self._validate_universe_domain()
871
872 # Send the request.
873 response = rpc(
874 request,
875 retry=retry,
876 timeout=timeout,
877 metadata=metadata,
878 )
879
880 # Done; return the response.
881 return response
882
883 def generate_id_token(
884 self,
885 request: Optional[Union[common.GenerateIdTokenRequest, dict]] = None,
886 *,
887 name: Optional[str] = None,
888 delegates: Optional[MutableSequence[str]] = None,
889 audience: Optional[str] = None,
890 include_email: Optional[bool] = None,
891 retry: OptionalRetry = gapic_v1.method.DEFAULT,
892 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
893 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
894 ) -> common.GenerateIdTokenResponse:
895 r"""Generates an OpenID Connect ID token for a service
896 account.
897
898 .. code-block:: python
899
900 # This snippet has been automatically generated and should be regarded as a
901 # code template only.
902 # It will require modifications to work:
903 # - It may require correct/in-range values for request initialization.
904 # - It may require specifying regional endpoints when creating the service
905 # client as shown in:
906 # https://googleapis.dev/python/google-api-core/latest/client_options.html
907 from google.cloud import iam_credentials_v1
908
909 def sample_generate_id_token():
910 # Create a client
911 client = iam_credentials_v1.IAMCredentialsClient()
912
913 # Initialize request argument(s)
914 request = iam_credentials_v1.GenerateIdTokenRequest(
915 name="name_value",
916 audience="audience_value",
917 )
918
919 # Make the request
920 response = client.generate_id_token(request=request)
921
922 # Handle the response
923 print(response)
924
925 Args:
926 request (Union[google.cloud.iam_credentials_v1.types.GenerateIdTokenRequest, dict]):
927 The request object.
928 name (str):
929 Required. The resource name of the service account for
930 which the credentials are requested, in the following
931 format:
932 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
933 The ``-`` wildcard character is required; replacing it
934 with a project ID is invalid.
935
936 This corresponds to the ``name`` field
937 on the ``request`` instance; if ``request`` is provided, this
938 should not be set.
939 delegates (MutableSequence[str]):
940 The sequence of service accounts in a delegation chain.
941 Each service account must be granted the
942 ``roles/iam.serviceAccountTokenCreator`` role on its
943 next service account in the chain. The last service
944 account in the chain must be granted the
945 ``roles/iam.serviceAccountTokenCreator`` role on the
946 service account that is specified in the ``name`` field
947 of the request.
948
949 The delegates must have the following format:
950 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
951 The ``-`` wildcard character is required; replacing it
952 with a project ID is invalid.
953
954 This corresponds to the ``delegates`` field
955 on the ``request`` instance; if ``request`` is provided, this
956 should not be set.
957 audience (str):
958 Required. The audience for the token,
959 such as the API or account that this
960 token grants access to.
961
962 This corresponds to the ``audience`` field
963 on the ``request`` instance; if ``request`` is provided, this
964 should not be set.
965 include_email (bool):
966 Include the service account email in the token. If set
967 to ``true``, the token will contain ``email`` and
968 ``email_verified`` claims.
969
970 This corresponds to the ``include_email`` field
971 on the ``request`` instance; if ``request`` is provided, this
972 should not be set.
973 retry (google.api_core.retry.Retry): Designation of what errors, if any,
974 should be retried.
975 timeout (float): The timeout for this request.
976 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
977 sent along with the request as metadata. Normally, each value must be of type `str`,
978 but for metadata keys ending with the suffix `-bin`, the corresponding values must
979 be of type `bytes`.
980
981 Returns:
982 google.cloud.iam_credentials_v1.types.GenerateIdTokenResponse:
983
984 """
985 # Create or coerce a protobuf request object.
986 # - Quick check: If we got a request object, we should *not* have
987 # gotten any keyword arguments that map to the request.
988 flattened_params = [name, delegates, audience, include_email]
989 has_flattened_params = (
990 len([param for param in flattened_params if param is not None]) > 0
991 )
992 if request is not None and has_flattened_params:
993 raise ValueError(
994 "If the `request` argument is set, then none of "
995 "the individual field arguments should be set."
996 )
997
998 # - Use the request object if provided (there's no risk of modifying the input as
999 # there are no flattened fields), or create one.
1000 if not isinstance(request, common.GenerateIdTokenRequest):
1001 request = common.GenerateIdTokenRequest(request)
1002 # If we have keyword arguments corresponding to fields on the
1003 # request, apply these.
1004 if name is not None:
1005 request.name = name
1006 if delegates is not None:
1007 request.delegates = delegates
1008 if audience is not None:
1009 request.audience = audience
1010 if include_email is not None:
1011 request.include_email = include_email
1012
1013 # Wrap the RPC method; this adds retry and timeout information,
1014 # and friendly error handling.
1015 rpc = self._transport._wrapped_methods[self._transport.generate_id_token]
1016
1017 # Certain fields should be provided within the metadata header;
1018 # add these here.
1019 metadata = tuple(metadata) + (
1020 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1021 )
1022
1023 # Validate the universe domain.
1024 self._validate_universe_domain()
1025
1026 # Send the request.
1027 response = rpc(
1028 request,
1029 retry=retry,
1030 timeout=timeout,
1031 metadata=metadata,
1032 )
1033
1034 # Done; return the response.
1035 return response
1036
1037 def sign_blob(
1038 self,
1039 request: Optional[Union[common.SignBlobRequest, dict]] = None,
1040 *,
1041 name: Optional[str] = None,
1042 delegates: Optional[MutableSequence[str]] = None,
1043 payload: Optional[bytes] = None,
1044 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1045 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1046 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1047 ) -> common.SignBlobResponse:
1048 r"""Signs a blob using a service account's system-managed
1049 private key.
1050
1051 .. code-block:: python
1052
1053 # This snippet has been automatically generated and should be regarded as a
1054 # code template only.
1055 # It will require modifications to work:
1056 # - It may require correct/in-range values for request initialization.
1057 # - It may require specifying regional endpoints when creating the service
1058 # client as shown in:
1059 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1060 from google.cloud import iam_credentials_v1
1061
1062 def sample_sign_blob():
1063 # Create a client
1064 client = iam_credentials_v1.IAMCredentialsClient()
1065
1066 # Initialize request argument(s)
1067 request = iam_credentials_v1.SignBlobRequest(
1068 name="name_value",
1069 payload=b'payload_blob',
1070 )
1071
1072 # Make the request
1073 response = client.sign_blob(request=request)
1074
1075 # Handle the response
1076 print(response)
1077
1078 Args:
1079 request (Union[google.cloud.iam_credentials_v1.types.SignBlobRequest, dict]):
1080 The request object.
1081 name (str):
1082 Required. The resource name of the service account for
1083 which the credentials are requested, in the following
1084 format:
1085 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
1086 The ``-`` wildcard character is required; replacing it
1087 with a project ID is invalid.
1088
1089 This corresponds to the ``name`` field
1090 on the ``request`` instance; if ``request`` is provided, this
1091 should not be set.
1092 delegates (MutableSequence[str]):
1093 The sequence of service accounts in a delegation chain.
1094 Each service account must be granted the
1095 ``roles/iam.serviceAccountTokenCreator`` role on its
1096 next service account in the chain. The last service
1097 account in the chain must be granted the
1098 ``roles/iam.serviceAccountTokenCreator`` role on the
1099 service account that is specified in the ``name`` field
1100 of the request.
1101
1102 The delegates must have the following format:
1103 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
1104 The ``-`` wildcard character is required; replacing it
1105 with a project ID is invalid.
1106
1107 This corresponds to the ``delegates`` field
1108 on the ``request`` instance; if ``request`` is provided, this
1109 should not be set.
1110 payload (bytes):
1111 Required. The bytes to sign.
1112 This corresponds to the ``payload`` field
1113 on the ``request`` instance; if ``request`` is provided, this
1114 should not be set.
1115 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1116 should be retried.
1117 timeout (float): The timeout for this request.
1118 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1119 sent along with the request as metadata. Normally, each value must be of type `str`,
1120 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1121 be of type `bytes`.
1122
1123 Returns:
1124 google.cloud.iam_credentials_v1.types.SignBlobResponse:
1125
1126 """
1127 # Create or coerce a protobuf request object.
1128 # - Quick check: If we got a request object, we should *not* have
1129 # gotten any keyword arguments that map to the request.
1130 flattened_params = [name, delegates, payload]
1131 has_flattened_params = (
1132 len([param for param in flattened_params if param is not None]) > 0
1133 )
1134 if request is not None and has_flattened_params:
1135 raise ValueError(
1136 "If the `request` argument is set, then none of "
1137 "the individual field arguments should be set."
1138 )
1139
1140 # - Use the request object if provided (there's no risk of modifying the input as
1141 # there are no flattened fields), or create one.
1142 if not isinstance(request, common.SignBlobRequest):
1143 request = common.SignBlobRequest(request)
1144 # If we have keyword arguments corresponding to fields on the
1145 # request, apply these.
1146 if name is not None:
1147 request.name = name
1148 if delegates is not None:
1149 request.delegates = delegates
1150 if payload is not None:
1151 request.payload = payload
1152
1153 # Wrap the RPC method; this adds retry and timeout information,
1154 # and friendly error handling.
1155 rpc = self._transport._wrapped_methods[self._transport.sign_blob]
1156
1157 # Certain fields should be provided within the metadata header;
1158 # add these here.
1159 metadata = tuple(metadata) + (
1160 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1161 )
1162
1163 # Validate the universe domain.
1164 self._validate_universe_domain()
1165
1166 # Send the request.
1167 response = rpc(
1168 request,
1169 retry=retry,
1170 timeout=timeout,
1171 metadata=metadata,
1172 )
1173
1174 # Done; return the response.
1175 return response
1176
1177 def sign_jwt(
1178 self,
1179 request: Optional[Union[common.SignJwtRequest, dict]] = None,
1180 *,
1181 name: Optional[str] = None,
1182 delegates: Optional[MutableSequence[str]] = None,
1183 payload: Optional[str] = None,
1184 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1185 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1186 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1187 ) -> common.SignJwtResponse:
1188 r"""Signs a JWT using a service account's system-managed
1189 private key.
1190
1191 .. code-block:: python
1192
1193 # This snippet has been automatically generated and should be regarded as a
1194 # code template only.
1195 # It will require modifications to work:
1196 # - It may require correct/in-range values for request initialization.
1197 # - It may require specifying regional endpoints when creating the service
1198 # client as shown in:
1199 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1200 from google.cloud import iam_credentials_v1
1201
1202 def sample_sign_jwt():
1203 # Create a client
1204 client = iam_credentials_v1.IAMCredentialsClient()
1205
1206 # Initialize request argument(s)
1207 request = iam_credentials_v1.SignJwtRequest(
1208 name="name_value",
1209 payload="payload_value",
1210 )
1211
1212 # Make the request
1213 response = client.sign_jwt(request=request)
1214
1215 # Handle the response
1216 print(response)
1217
1218 Args:
1219 request (Union[google.cloud.iam_credentials_v1.types.SignJwtRequest, dict]):
1220 The request object.
1221 name (str):
1222 Required. The resource name of the service account for
1223 which the credentials are requested, in the following
1224 format:
1225 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
1226 The ``-`` wildcard character is required; replacing it
1227 with a project ID is invalid.
1228
1229 This corresponds to the ``name`` field
1230 on the ``request`` instance; if ``request`` is provided, this
1231 should not be set.
1232 delegates (MutableSequence[str]):
1233 The sequence of service accounts in a delegation chain.
1234 Each service account must be granted the
1235 ``roles/iam.serviceAccountTokenCreator`` role on its
1236 next service account in the chain. The last service
1237 account in the chain must be granted the
1238 ``roles/iam.serviceAccountTokenCreator`` role on the
1239 service account that is specified in the ``name`` field
1240 of the request.
1241
1242 The delegates must have the following format:
1243 ``projects/-/serviceAccounts/{ACCOUNT_EMAIL_OR_UNIQUEID}``.
1244 The ``-`` wildcard character is required; replacing it
1245 with a project ID is invalid.
1246
1247 This corresponds to the ``delegates`` field
1248 on the ``request`` instance; if ``request`` is provided, this
1249 should not be set.
1250 payload (str):
1251 Required. The JWT payload to sign: a
1252 JSON object that contains a JWT Claims
1253 Set.
1254
1255 This corresponds to the ``payload`` field
1256 on the ``request`` instance; if ``request`` is provided, this
1257 should not be set.
1258 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1259 should be retried.
1260 timeout (float): The timeout for this request.
1261 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1262 sent along with the request as metadata. Normally, each value must be of type `str`,
1263 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1264 be of type `bytes`.
1265
1266 Returns:
1267 google.cloud.iam_credentials_v1.types.SignJwtResponse:
1268
1269 """
1270 # Create or coerce a protobuf request object.
1271 # - Quick check: If we got a request object, we should *not* have
1272 # gotten any keyword arguments that map to the request.
1273 flattened_params = [name, delegates, payload]
1274 has_flattened_params = (
1275 len([param for param in flattened_params if param is not None]) > 0
1276 )
1277 if request is not None and has_flattened_params:
1278 raise ValueError(
1279 "If the `request` argument is set, then none of "
1280 "the individual field arguments should be set."
1281 )
1282
1283 # - Use the request object if provided (there's no risk of modifying the input as
1284 # there are no flattened fields), or create one.
1285 if not isinstance(request, common.SignJwtRequest):
1286 request = common.SignJwtRequest(request)
1287 # If we have keyword arguments corresponding to fields on the
1288 # request, apply these.
1289 if name is not None:
1290 request.name = name
1291 if delegates is not None:
1292 request.delegates = delegates
1293 if payload is not None:
1294 request.payload = payload
1295
1296 # Wrap the RPC method; this adds retry and timeout information,
1297 # and friendly error handling.
1298 rpc = self._transport._wrapped_methods[self._transport.sign_jwt]
1299
1300 # Certain fields should be provided within the metadata header;
1301 # add these here.
1302 metadata = tuple(metadata) + (
1303 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1304 )
1305
1306 # Validate the universe domain.
1307 self._validate_universe_domain()
1308
1309 # Send the request.
1310 response = rpc(
1311 request,
1312 retry=retry,
1313 timeout=timeout,
1314 metadata=metadata,
1315 )
1316
1317 # Done; return the response.
1318 return response
1319
1320 def __enter__(self) -> "IAMCredentialsClient":
1321 return self
1322
1323 def __exit__(self, type, value, traceback):
1324 """Releases underlying transport's resources.
1325
1326 .. warning::
1327 ONLY use as a context manager if the transport is NOT shared
1328 with other clients! Exiting the with block will CLOSE the transport
1329 and may cause errors in other clients!
1330 """
1331 self.transport.close()
1332
1333
1334DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1335 gapic_version=package_version.__version__
1336)
1337
1338if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
1339 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
1340
1341__all__ = ("IAMCredentialsClient",)