1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75import logging
76from typing import Optional, TYPE_CHECKING
77
78
79from google.auth import _helpers
80from google.auth import _service_account_info
81from google.auth import credentials
82from google.auth import exceptions
83from google.auth import iam
84from google.auth import jwt
85from google.auth import metrics
86from google.oauth2 import _client
87
88if TYPE_CHECKING: # pragma: NO COVER
89 import google.auth.transport
90
91_LOGGER = logging.getLogger(__name__)
92
93_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
94_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
95
96
97class Credentials(
98 credentials.Signing,
99 credentials.Scoped,
100 credentials.CredentialsWithQuotaProject,
101 credentials.CredentialsWithTokenUri,
102 credentials.CredentialsWithRegionalAccessBoundary,
103):
104 """Service account credentials
105
106 Usually, you'll create these credentials with one of the helper
107 constructors. To create credentials using a Google service account
108 private key JSON file::
109
110 credentials = service_account.Credentials.from_service_account_file(
111 'service-account.json')
112
113 Or if you already have the service account file loaded::
114
115 service_account_info = json.load(open('service_account.json'))
116 credentials = service_account.Credentials.from_service_account_info(
117 service_account_info)
118
119 Both helper methods pass on arguments to the constructor, so you can
120 specify additional scopes and a subject if necessary::
121
122 credentials = service_account.Credentials.from_service_account_file(
123 'service-account.json',
124 scopes=['email'],
125 subject='user@example.com')
126
127 The credentials are considered immutable. If you want to modify the scopes
128 or the subject used for delegation, use :meth:`with_scopes` or
129 :meth:`with_subject`::
130
131 scoped_credentials = credentials.with_scopes(['email'])
132 delegated_credentials = credentials.with_subject(subject)
133
134 To add a quota project, use :meth:`with_quota_project`::
135
136 credentials = credentials.with_quota_project('myproject-123')
137 """
138
139 def __init__(
140 self,
141 signer,
142 service_account_email,
143 token_uri,
144 scopes=None,
145 default_scopes=None,
146 subject=None,
147 project_id=None,
148 quota_project_id=None,
149 additional_claims=None,
150 always_use_jwt_access=False,
151 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
152 trust_boundary=None,
153 ):
154 """
155 Args:
156 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
157 service_account_email (str): The service account's email.
158 scopes (Sequence[str]): User-defined scopes to request during the
159 authorization grant.
160 default_scopes (Sequence[str]): Default scopes passed by a
161 Google client library. Use 'scopes' for user-defined scopes.
162 token_uri (str): The OAuth 2.0 Token URI.
163 subject (str): For domain-wide delegation, the email address of the
164 user to for which to request delegated access.
165 project_id (str): Project ID associated with the service account
166 credential.
167 quota_project_id (Optional[str]): The project ID used for quota and
168 billing.
169 additional_claims (Mapping[str, str]): Any additional claims for
170 the JWT assertion used in the authorization grant.
171 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
172 be always used.
173 universe_domain (str): The universe domain. The default
174 universe domain is googleapis.com. For default value self
175 signed jwt is used for token refresh.
176 trust_boundary (Mapping[str,str]): A credential trust boundary.
177
178 .. note:: Typically one of the helper constructors
179 :meth:`from_service_account_file` or
180 :meth:`from_service_account_info` are used instead of calling the
181 constructor directly.
182 """
183 super(Credentials, self).__init__()
184
185 self._cred_file_path = None
186 self._scopes = scopes
187 self._default_scopes = default_scopes
188 self._signer = signer
189 self._service_account_email = service_account_email
190 self._subject = subject
191 self._project_id = project_id
192 self._quota_project_id = quota_project_id
193 self._token_uri = token_uri
194 self._always_use_jwt_access = always_use_jwt_access
195 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
196
197 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
198 self._always_use_jwt_access = True
199
200 self._jwt_credentials = None
201
202 if additional_claims is not None:
203 self._additional_claims = additional_claims
204 else:
205 self._additional_claims = {}
206
207 self._trust_boundary = trust_boundary
208
209 @classmethod
210 def _from_signer_and_info(cls, signer, info, **kwargs):
211 """Creates a Credentials instance from a signer and service account
212 info.
213
214 Args:
215 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
216 info (Mapping[str, str]): The service account info.
217 kwargs: Additional arguments to pass to the constructor.
218
219 Returns:
220 google.auth.jwt.Credentials: The constructed credentials.
221
222 Raises:
223 ValueError: If the info is not in the expected format.
224 """
225 return cls(
226 signer,
227 service_account_email=info["client_email"],
228 token_uri=info["token_uri"],
229 project_id=info.get("project_id"),
230 universe_domain=info.get(
231 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
232 ),
233 trust_boundary=info.get("trust_boundary"),
234 **kwargs,
235 )
236
237 @classmethod
238 def from_service_account_info(cls, info, **kwargs):
239 """Creates a Credentials instance from parsed service account info.
240
241 Args:
242 info (Mapping[str, str]): The service account info in Google
243 format.
244 kwargs: Additional arguments to pass to the constructor.
245
246 Returns:
247 google.auth.service_account.Credentials: The constructed
248 credentials.
249
250 Raises:
251 ValueError: If the info is not in the expected format.
252 """
253 signer = _service_account_info.from_dict(
254 info, require=["client_email", "token_uri"]
255 )
256 return cls._from_signer_and_info(signer, info, **kwargs)
257
258 @classmethod
259 def from_service_account_file(cls, filename, **kwargs):
260 """Creates a Credentials instance from a service account json file.
261
262 Args:
263 filename (str): The path to the service account json file.
264 kwargs: Additional arguments to pass to the constructor.
265
266 Returns:
267 google.auth.service_account.Credentials: The constructed
268 credentials.
269 """
270 info, signer = _service_account_info.from_filename(
271 filename, require=["client_email", "token_uri"]
272 )
273 return cls._from_signer_and_info(signer, info, **kwargs)
274
275 @property
276 def service_account_email(self):
277 """The service account email."""
278 return self._service_account_email
279
280 @property
281 def project_id(self):
282 """Project ID associated with this credential."""
283 return self._project_id
284
285 @property
286 def requires_scopes(self):
287 """Checks if the credentials requires scopes.
288
289 Returns:
290 bool: True if there are no scopes set otherwise False.
291 """
292 return True if not self._scopes else False
293
294 def _make_copy(self):
295 cred = self.__class__(
296 self._signer,
297 service_account_email=self._service_account_email,
298 scopes=copy.copy(self._scopes),
299 default_scopes=copy.copy(self._default_scopes),
300 token_uri=self._token_uri,
301 subject=self._subject,
302 project_id=self._project_id,
303 quota_project_id=self._quota_project_id,
304 additional_claims=self._additional_claims.copy(),
305 always_use_jwt_access=self._always_use_jwt_access,
306 universe_domain=self._universe_domain,
307 trust_boundary=self._trust_boundary,
308 )
309 cred._cred_file_path = self._cred_file_path
310 self._copy_regional_access_boundary_manager(cred)
311 return cred
312
313 @_helpers.copy_docstring(credentials.Scoped)
314 def with_scopes(self, scopes, default_scopes=None):
315 cred = self._make_copy()
316 cred._scopes = scopes
317 cred._default_scopes = default_scopes
318 return cred
319
320 def with_always_use_jwt_access(self, always_use_jwt_access):
321 """Create a copy of these credentials with the specified always_use_jwt_access value.
322
323 Args:
324 always_use_jwt_access (bool): Whether always use self signed JWT or not.
325
326 Returns:
327 google.auth.service_account.Credentials: A new credentials
328 instance.
329 Raises:
330 google.auth.exceptions.InvalidValue: If the universe domain is not
331 default and always_use_jwt_access is False.
332 """
333 cred = self._make_copy()
334 if (
335 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
336 and not always_use_jwt_access
337 ):
338 raise exceptions.InvalidValue(
339 "always_use_jwt_access should be True for non-default universe domain"
340 )
341 cred._always_use_jwt_access = always_use_jwt_access
342 return cred
343
344 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
345 def with_universe_domain(self, universe_domain):
346 cred = self._make_copy()
347 cred._universe_domain = universe_domain
348 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
349 cred._always_use_jwt_access = True
350 return cred
351
352 def with_subject(self, subject):
353 """Create a copy of these credentials with the specified subject.
354
355 Args:
356 subject (str): The subject claim.
357
358 Returns:
359 google.auth.service_account.Credentials: A new credentials
360 instance.
361 """
362 cred = self._make_copy()
363 cred._subject = subject
364 return cred
365
366 def with_claims(self, additional_claims):
367 """Returns a copy of these credentials with modified claims.
368
369 Args:
370 additional_claims (Mapping[str, str]): Any additional claims for
371 the JWT payload. This will be merged with the current
372 additional claims.
373
374 Returns:
375 google.auth.service_account.Credentials: A new credentials
376 instance.
377 """
378 new_additional_claims = copy.deepcopy(self._additional_claims)
379 new_additional_claims.update(additional_claims or {})
380 cred = self._make_copy()
381 cred._additional_claims = new_additional_claims
382 return cred
383
384 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
385 def with_quota_project(self, quota_project_id):
386 cred = self._make_copy()
387 cred._quota_project_id = quota_project_id
388 return cred
389
390 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
391 def with_token_uri(self, token_uri):
392 cred = self._make_copy()
393 cred._token_uri = token_uri
394 return cred
395
396 def _make_authorization_grant_assertion(self):
397 """Create the OAuth 2.0 assertion.
398
399 This assertion is used during the OAuth 2.0 grant to acquire an
400 access token.
401
402 Returns:
403 bytes: The authorization grant assertion.
404 """
405 now = _helpers.utcnow()
406 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
407 expiry = now + lifetime
408
409 payload = {
410 "iat": _helpers.datetime_to_secs(now),
411 "exp": _helpers.datetime_to_secs(expiry),
412 # The issuer must be the service account email.
413 "iss": self._service_account_email,
414 # The audience must be the auth token endpoint's URI
415 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
416 "scope": _helpers.scopes_to_string(self._scopes or ()),
417 }
418
419 payload.update(self._additional_claims)
420
421 # The subject can be a user email for domain-wide delegation.
422 if self._subject:
423 payload.setdefault("sub", self._subject)
424
425 token = jwt.encode(self._signer, payload)
426
427 return token
428
429 def _use_self_signed_jwt(self):
430 # Since domain wide delegation doesn't work with self signed JWT. If
431 # subject exists, then we should not use self signed JWT.
432 return self._subject is None and self._jwt_credentials is not None
433
434 def _metric_header_for_usage(self):
435 if self._use_self_signed_jwt():
436 return metrics.CRED_TYPE_SA_JWT
437 return metrics.CRED_TYPE_SA_ASSERTION
438
439 @_helpers.copy_docstring(credentials.CredentialsWithRegionalAccessBoundary)
440 def _perform_refresh_token(self, request):
441 if self._always_use_jwt_access and not self._jwt_credentials:
442 # If self signed jwt should be used but jwt credential is not
443 # created, try to create one with scopes
444 self._create_self_signed_jwt(None)
445
446 if (
447 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
448 and self._subject
449 ):
450 raise exceptions.RefreshError(
451 "domain wide delegation is not supported for non-default universe domain"
452 )
453
454 if self._use_self_signed_jwt():
455 self._jwt_credentials.refresh(request)
456 self.token = self._jwt_credentials.token.decode()
457 self.expiry = self._jwt_credentials.expiry
458 else:
459 assertion = self._make_authorization_grant_assertion()
460 access_token, expiry, _ = _client.jwt_grant(
461 request, self._token_uri, assertion
462 )
463 self.token = access_token
464 self.expiry = expiry
465
466 def _create_self_signed_jwt(self, audience):
467 """Create a self-signed JWT from the credentials if requirements are met.
468
469 Args:
470 audience (str): The service URL. ``https://[API_ENDPOINT]/``
471 """
472 # https://google.aip.dev/auth/4111
473 if self._always_use_jwt_access:
474 if self._scopes:
475 additional_claims = {"scope": " ".join(self._scopes)}
476 if (
477 self._jwt_credentials is None
478 or self._jwt_credentials.additional_claims != additional_claims
479 ):
480 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
481 self, None, additional_claims=additional_claims
482 )
483 elif audience:
484 if (
485 self._jwt_credentials is None
486 or self._jwt_credentials._audience != audience
487 ):
488 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
489 self, audience
490 )
491 elif self._default_scopes:
492 additional_claims = {"scope": " ".join(self._default_scopes)}
493 if (
494 self._jwt_credentials is None
495 or additional_claims != self._jwt_credentials.additional_claims
496 ):
497 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
498 self, None, additional_claims=additional_claims
499 )
500 elif not self._scopes and audience:
501 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
502 self, audience
503 )
504
505 def _build_regional_access_boundary_lookup_url(
506 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821
507 ):
508 """Builds and returns the URL for the Regional Access Boundary lookup API.
509
510 This method constructs the specific URL for the IAM Credentials API's
511 `allowedLocations` endpoint, using the credential's universe domain
512 and service account email.
513
514 Returns:
515 Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None
516 if the service account email is missing.
517 """
518 if not self.service_account_email:
519 _LOGGER.error(
520 "Service account email is required to build the Regional Access Boundary lookup URL for service account credentials."
521 )
522 return None
523 return iam._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format(
524 service_account_email=self._service_account_email,
525 )
526
527 @_helpers.copy_docstring(credentials.Signing)
528 def sign_bytes(self, message):
529 return self._signer.sign(message)
530
531 @property # type: ignore
532 @_helpers.copy_docstring(credentials.Signing)
533 def signer(self):
534 return self._signer
535
536 @property # type: ignore
537 @_helpers.copy_docstring(credentials.Signing)
538 def signer_email(self):
539 return self._service_account_email
540
541 @_helpers.copy_docstring(credentials.Credentials)
542 def get_cred_info(self):
543 if self._cred_file_path:
544 return {
545 "credential_source": self._cred_file_path,
546 "credential_type": "service account credentials",
547 "principal": self.service_account_email,
548 }
549 return None
550
551
552class IDTokenCredentials(
553 credentials.Signing,
554 credentials.CredentialsWithQuotaProject,
555 credentials.CredentialsWithTokenUri,
556):
557 """Open ID Connect ID Token-based service account credentials.
558
559 These credentials are largely similar to :class:`.Credentials`, but instead
560 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
561 ID Connect ID Token as the bearer token. These credentials are useful when
562 communicating to services that require ID Tokens and can not accept access
563 tokens.
564
565 Usually, you'll create these credentials with one of the helper
566 constructors. To create credentials using a Google service account
567 private key JSON file::
568
569 credentials = (
570 service_account.IDTokenCredentials.from_service_account_file(
571 'service-account.json'))
572
573
574 Or if you already have the service account file loaded::
575
576 service_account_info = json.load(open('service_account.json'))
577 credentials = (
578 service_account.IDTokenCredentials.from_service_account_info(
579 service_account_info))
580
581
582 Both helper methods pass on arguments to the constructor, so you can
583 specify additional scopes and a subject if necessary::
584
585 credentials = (
586 service_account.IDTokenCredentials.from_service_account_file(
587 'service-account.json',
588 scopes=['email'],
589 subject='user@example.com'))
590
591
592 The credentials are considered immutable. If you want to modify the scopes
593 or the subject used for delegation, use :meth:`with_scopes` or
594 :meth:`with_subject`::
595
596 scoped_credentials = credentials.with_scopes(['email'])
597 delegated_credentials = credentials.with_subject(subject)
598
599 """
600
601 def __init__(
602 self,
603 signer,
604 service_account_email,
605 token_uri,
606 target_audience,
607 additional_claims=None,
608 quota_project_id=None,
609 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
610 ):
611 """
612 Args:
613 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
614 service_account_email (str): The service account's email.
615 token_uri (str): The OAuth 2.0 Token URI.
616 target_audience (str): The intended audience for these credentials,
617 used when requesting the ID Token. The ID Token's ``aud`` claim
618 will be set to this string.
619 additional_claims (Mapping[str, str]): Any additional claims for
620 the JWT assertion used in the authorization grant.
621 quota_project_id (Optional[str]): The project ID used for quota and billing.
622 universe_domain (str): The universe domain. The default
623 universe domain is googleapis.com. For default value IAM ID
624 token endponint is used for token refresh. Note that
625 iam.serviceAccountTokenCreator role is required to use the IAM
626 endpoint.
627
628 .. note:: Typically one of the helper constructors
629 :meth:`from_service_account_file` or
630 :meth:`from_service_account_info` are used instead of calling the
631 constructor directly.
632 """
633 super(IDTokenCredentials, self).__init__()
634 self._signer = signer
635 self._service_account_email = service_account_email
636 self._token_uri = token_uri
637 self._target_audience = target_audience
638 self._quota_project_id = quota_project_id
639 self._use_iam_endpoint = False
640
641 if not universe_domain:
642 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
643 else:
644 self._universe_domain = universe_domain
645 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
646 "googleapis.com", self._universe_domain
647 )
648
649 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
650 self._use_iam_endpoint = True
651
652 if additional_claims is not None:
653 self._additional_claims = additional_claims
654 else:
655 self._additional_claims = {}
656
657 @classmethod
658 def _from_signer_and_info(cls, signer, info, **kwargs):
659 """Creates a credentials instance from a signer and service account
660 info.
661
662 Args:
663 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
664 info (Mapping[str, str]): The service account info.
665 kwargs: Additional arguments to pass to the constructor.
666
667 Returns:
668 google.auth.jwt.IDTokenCredentials: The constructed credentials.
669
670 Raises:
671 ValueError: If the info is not in the expected format.
672 """
673 kwargs.setdefault("service_account_email", info["client_email"])
674 kwargs.setdefault("token_uri", info["token_uri"])
675 if "universe_domain" in info:
676 kwargs["universe_domain"] = info["universe_domain"]
677 return cls(signer, **kwargs)
678
679 @classmethod
680 def from_service_account_info(cls, info, **kwargs):
681 """Creates a credentials instance from parsed service account info.
682
683 Args:
684 info (Mapping[str, str]): The service account info in Google
685 format.
686 kwargs: Additional arguments to pass to the constructor.
687
688 Returns:
689 google.auth.service_account.IDTokenCredentials: The constructed
690 credentials.
691
692 Raises:
693 ValueError: If the info is not in the expected format.
694 """
695 signer = _service_account_info.from_dict(
696 info, require=["client_email", "token_uri"]
697 )
698 return cls._from_signer_and_info(signer, info, **kwargs)
699
700 @classmethod
701 def from_service_account_file(cls, filename, **kwargs):
702 """Creates a credentials instance from a service account json file.
703
704 Args:
705 filename (str): The path to the service account json file.
706 kwargs: Additional arguments to pass to the constructor.
707
708 Returns:
709 google.auth.service_account.IDTokenCredentials: The constructed
710 credentials.
711 """
712 info, signer = _service_account_info.from_filename(
713 filename, require=["client_email", "token_uri"]
714 )
715 return cls._from_signer_and_info(signer, info, **kwargs)
716
717 def _make_copy(self):
718 cred = self.__class__(
719 self._signer,
720 service_account_email=self._service_account_email,
721 token_uri=self._token_uri,
722 target_audience=self._target_audience,
723 additional_claims=self._additional_claims.copy(),
724 quota_project_id=self.quota_project_id,
725 universe_domain=self._universe_domain,
726 )
727 # _use_iam_endpoint is not exposed in the constructor
728 cred._use_iam_endpoint = self._use_iam_endpoint
729 return cred
730
731 def with_target_audience(self, target_audience):
732 """Create a copy of these credentials with the specified target
733 audience.
734
735 Args:
736 target_audience (str): The intended audience for these credentials,
737 used when requesting the ID Token.
738
739 Returns:
740 google.auth.service_account.IDTokenCredentials: A new credentials
741 instance.
742 """
743 cred = self._make_copy()
744 cred._target_audience = target_audience
745 return cred
746
747 def _with_use_iam_endpoint(self, use_iam_endpoint):
748 """Create a copy of these credentials with the use_iam_endpoint value.
749
750 Args:
751 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
752 be used instead of the token_uri. Note that
753 iam.serviceAccountTokenCreator role is required to use the IAM
754 endpoint. The default value is False. This feature is currently
755 experimental and subject to change without notice.
756
757 Returns:
758 google.auth.service_account.IDTokenCredentials: A new credentials
759 instance.
760 Raises:
761 google.auth.exceptions.InvalidValue: If the universe domain is not
762 default and use_iam_endpoint is False.
763 """
764 cred = self._make_copy()
765 if (
766 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
767 and not use_iam_endpoint
768 ):
769 raise exceptions.InvalidValue(
770 "use_iam_endpoint should be True for non-default universe domain"
771 )
772 cred._use_iam_endpoint = use_iam_endpoint
773 return cred
774
775 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
776 def with_quota_project(self, quota_project_id):
777 cred = self._make_copy()
778 cred._quota_project_id = quota_project_id
779 return cred
780
781 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
782 def with_token_uri(self, token_uri):
783 cred = self._make_copy()
784 cred._token_uri = token_uri
785 return cred
786
787 def _make_authorization_grant_assertion(self):
788 """Create the OAuth 2.0 assertion.
789
790 This assertion is used during the OAuth 2.0 grant to acquire an
791 ID token.
792
793 Returns:
794 bytes: The authorization grant assertion.
795 """
796 now = _helpers.utcnow()
797 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
798 expiry = now + lifetime
799
800 payload = {
801 "iat": _helpers.datetime_to_secs(now),
802 "exp": _helpers.datetime_to_secs(expiry),
803 # The issuer must be the service account email.
804 "iss": self.service_account_email,
805 # The audience must be the auth token endpoint's URI
806 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
807 # The target audience specifies which service the ID token is
808 # intended for.
809 "target_audience": self._target_audience,
810 }
811
812 payload.update(self._additional_claims)
813
814 token = jwt.encode(self._signer, payload)
815
816 return token
817
818 def _refresh_with_iam_endpoint(self, request):
819 """Use IAM generateIdToken endpoint to obtain an ID token.
820
821 It works as follows:
822
823 1. First we create a self signed jwt with
824 https://www.googleapis.com/auth/iam being the scope.
825
826 2. Next we use the self signed jwt as the access token, and make a POST
827 request to IAM generateIdToken endpoint. The request body is:
828 {
829 "audience": self._target_audience,
830 "includeEmail": "true",
831 "useEmailAzp": "true",
832 }
833
834 If the request is succesfully, it will return {"token":"the ID token"},
835 and we can extract the ID token and compute its expiry.
836 """
837 jwt_credentials = jwt.Credentials.from_signing_credentials(
838 self,
839 None,
840 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
841 )
842 jwt_credentials.refresh(request)
843
844 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
845 request,
846 self._iam_id_token_endpoint,
847 self.signer_email,
848 self._target_audience,
849 jwt_credentials.token.decode(),
850 self._universe_domain,
851 )
852
853 @_helpers.copy_docstring(credentials.Credentials)
854 def refresh(self, request):
855 if self._use_iam_endpoint:
856 self._refresh_with_iam_endpoint(request)
857 else:
858 assertion = self._make_authorization_grant_assertion()
859 access_token, expiry, _ = _client.id_token_jwt_grant(
860 request, self._token_uri, assertion
861 )
862 self.token = access_token
863 self.expiry = expiry
864
865 @property
866 def service_account_email(self):
867 """The service account email."""
868 return self._service_account_email
869
870 @_helpers.copy_docstring(credentials.Signing)
871 def sign_bytes(self, message):
872 return self._signer.sign(message)
873
874 @property # type: ignore
875 @_helpers.copy_docstring(credentials.Signing)
876 def signer(self):
877 return self._signer
878
879 @property # type: ignore
880 @_helpers.copy_docstring(credentials.Signing)
881 def signer_email(self):
882 return self._service_account_email