1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75import logging
76from typing import Optional, TYPE_CHECKING
77
78
79from google.auth import _helpers
80from google.auth import _regional_access_boundary_utils
81from google.auth import _service_account_info
82from google.auth import credentials
83from google.auth import exceptions
84from google.auth import iam
85from google.auth import jwt
86from google.auth import metrics
87from google.oauth2 import _client
88
89if TYPE_CHECKING: # pragma: NO COVER
90 import google.auth.transport
91
92_LOGGER = logging.getLogger(__name__)
93
94_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
95_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
96
97
98class Credentials(
99 credentials.Signing,
100 credentials.Scoped,
101 credentials.CredentialsWithQuotaProject,
102 credentials.CredentialsWithTokenUri,
103 credentials.CredentialsWithRegionalAccessBoundary,
104):
105 """Service account credentials
106
107 Usually, you'll create these credentials with one of the helper
108 constructors. To create credentials using a Google service account
109 private key JSON file::
110
111 credentials = service_account.Credentials.from_service_account_file(
112 'service-account.json')
113
114 Or if you already have the service account file loaded::
115
116 service_account_info = json.load(open('service_account.json'))
117 credentials = service_account.Credentials.from_service_account_info(
118 service_account_info)
119
120 Both helper methods pass on arguments to the constructor, so you can
121 specify additional scopes and a subject if necessary::
122
123 credentials = service_account.Credentials.from_service_account_file(
124 'service-account.json',
125 scopes=['email'],
126 subject='user@example.com')
127
128 The credentials are considered immutable. If you want to modify the scopes
129 or the subject used for delegation, use :meth:`with_scopes` or
130 :meth:`with_subject`::
131
132 scoped_credentials = credentials.with_scopes(['email'])
133 delegated_credentials = credentials.with_subject(subject)
134
135 To add a quota project, use :meth:`with_quota_project`::
136
137 credentials = credentials.with_quota_project('myproject-123')
138 """
139
140 def __init__(
141 self,
142 signer,
143 service_account_email,
144 token_uri,
145 scopes=None,
146 default_scopes=None,
147 subject=None,
148 project_id=None,
149 quota_project_id=None,
150 additional_claims=None,
151 always_use_jwt_access=False,
152 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
153 trust_boundary=None,
154 ):
155 """
156 Args:
157 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
158 service_account_email (str): The service account's email.
159 scopes (Sequence[str]): User-defined scopes to request during the
160 authorization grant.
161 default_scopes (Sequence[str]): Default scopes passed by a
162 Google client library. Use 'scopes' for user-defined scopes.
163 token_uri (str): The OAuth 2.0 Token URI.
164 subject (str): For domain-wide delegation, the email address of the
165 user to for which to request delegated access.
166 project_id (str): Project ID associated with the service account
167 credential.
168 quota_project_id (Optional[str]): The project ID used for quota and
169 billing.
170 additional_claims (Mapping[str, str]): Any additional claims for
171 the JWT assertion used in the authorization grant.
172 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
173 be always used.
174 universe_domain (str): The universe domain. The default
175 universe domain is googleapis.com. For default value self
176 signed jwt is used for token refresh.
177 trust_boundary (Mapping[str,str]): A credential trust boundary.
178
179 .. note:: Typically one of the helper constructors
180 :meth:`from_service_account_file` or
181 :meth:`from_service_account_info` are used instead of calling the
182 constructor directly.
183 """
184 super(Credentials, self).__init__()
185
186 self._cred_file_path = None
187 self._scopes = scopes
188 self._default_scopes = default_scopes
189 self._signer = signer
190 self._service_account_email = service_account_email
191 self._subject = subject
192 self._project_id = project_id
193 self._quota_project_id = quota_project_id
194 self._token_uri = token_uri
195 self._always_use_jwt_access = always_use_jwt_access
196 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
197
198 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
199 self._always_use_jwt_access = True
200
201 self._jwt_credentials = None
202
203 if additional_claims is not None:
204 self._additional_claims = additional_claims
205 else:
206 self._additional_claims = {}
207
208 self._trust_boundary = trust_boundary
209
210 @classmethod
211 def _from_signer_and_info(cls, signer, info, **kwargs):
212 """Creates a Credentials instance from a signer and service account
213 info.
214
215 Args:
216 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
217 info (Mapping[str, str]): The service account info.
218 kwargs: Additional arguments to pass to the constructor.
219
220 Returns:
221 google.auth.jwt.Credentials: The constructed credentials.
222
223 Raises:
224 ValueError: If the info is not in the expected format.
225 """
226 return cls(
227 signer,
228 service_account_email=info["client_email"],
229 token_uri=info["token_uri"],
230 project_id=info.get("project_id"),
231 universe_domain=info.get(
232 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
233 ),
234 trust_boundary=info.get("trust_boundary"),
235 **kwargs,
236 )
237
238 @classmethod
239 def from_service_account_info(cls, info, **kwargs):
240 """Creates a Credentials instance from parsed service account info.
241
242 Args:
243 info (Mapping[str, str]): The service account info in Google
244 format.
245 kwargs: Additional arguments to pass to the constructor.
246
247 Returns:
248 google.auth.service_account.Credentials: The constructed
249 credentials.
250
251 Raises:
252 ValueError: If the info is not in the expected format.
253 """
254 signer = _service_account_info.from_dict(
255 info, require=["client_email", "token_uri"]
256 )
257 return cls._from_signer_and_info(signer, info, **kwargs)
258
259 @classmethod
260 def from_service_account_file(cls, filename, **kwargs):
261 """Creates a Credentials instance from a service account json file.
262
263 Args:
264 filename (str): The path to the service account json file.
265 kwargs: Additional arguments to pass to the constructor.
266
267 Returns:
268 google.auth.service_account.Credentials: The constructed
269 credentials.
270 """
271 info, signer = _service_account_info.from_filename(
272 filename, require=["client_email", "token_uri"]
273 )
274 return cls._from_signer_and_info(signer, info, **kwargs)
275
276 @property
277 def service_account_email(self):
278 """The service account email."""
279 return self._service_account_email
280
281 @property
282 def project_id(self):
283 """Project ID associated with this credential."""
284 return self._project_id
285
286 @property
287 def requires_scopes(self):
288 """Checks if the credentials requires scopes.
289
290 Returns:
291 bool: True if there are no scopes set otherwise False.
292 """
293 return True if not self._scopes else False
294
295 def _make_copy(self):
296 cred = self.__class__(
297 self._signer,
298 service_account_email=self._service_account_email,
299 scopes=copy.copy(self._scopes),
300 default_scopes=copy.copy(self._default_scopes),
301 token_uri=self._token_uri,
302 subject=self._subject,
303 project_id=self._project_id,
304 quota_project_id=self._quota_project_id,
305 additional_claims=self._additional_claims.copy(),
306 always_use_jwt_access=self._always_use_jwt_access,
307 universe_domain=self._universe_domain,
308 trust_boundary=self._trust_boundary,
309 )
310 cred._cred_file_path = self._cred_file_path
311 self._copy_regional_access_boundary_manager(cred)
312 return cred
313
314 @_helpers.copy_docstring(credentials.Scoped)
315 def with_scopes(self, scopes, default_scopes=None):
316 cred = self._make_copy()
317 cred._scopes = scopes
318 cred._default_scopes = default_scopes
319 return cred
320
321 def with_always_use_jwt_access(self, always_use_jwt_access):
322 """Create a copy of these credentials with the specified always_use_jwt_access value.
323
324 Args:
325 always_use_jwt_access (bool): Whether always use self signed JWT or not.
326
327 Returns:
328 google.auth.service_account.Credentials: A new credentials
329 instance.
330 Raises:
331 google.auth.exceptions.InvalidValue: If the universe domain is not
332 default and always_use_jwt_access is False.
333 """
334 cred = self._make_copy()
335 if (
336 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
337 and not always_use_jwt_access
338 ):
339 raise exceptions.InvalidValue(
340 "always_use_jwt_access should be True for non-default universe domain"
341 )
342 cred._always_use_jwt_access = always_use_jwt_access
343 return cred
344
345 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
346 def with_universe_domain(self, universe_domain):
347 cred = self._make_copy()
348 cred._universe_domain = universe_domain
349 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
350 cred._always_use_jwt_access = True
351 return cred
352
353 def with_subject(self, subject):
354 """Create a copy of these credentials with the specified subject.
355
356 Args:
357 subject (str): The subject claim.
358
359 Returns:
360 google.auth.service_account.Credentials: A new credentials
361 instance.
362 """
363 cred = self._make_copy()
364 cred._subject = subject
365 return cred
366
367 def with_claims(self, additional_claims):
368 """Returns a copy of these credentials with modified claims.
369
370 Args:
371 additional_claims (Mapping[str, str]): Any additional claims for
372 the JWT payload. This will be merged with the current
373 additional claims.
374
375 Returns:
376 google.auth.service_account.Credentials: A new credentials
377 instance.
378 """
379 new_additional_claims = copy.deepcopy(self._additional_claims)
380 new_additional_claims.update(additional_claims or {})
381 cred = self._make_copy()
382 cred._additional_claims = new_additional_claims
383 return cred
384
385 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
386 def with_quota_project(self, quota_project_id):
387 cred = self._make_copy()
388 cred._quota_project_id = quota_project_id
389 return cred
390
391 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
392 def with_token_uri(self, token_uri):
393 cred = self._make_copy()
394 cred._token_uri = token_uri
395 return cred
396
397 def _make_authorization_grant_assertion(self):
398 """Create the OAuth 2.0 assertion.
399
400 This assertion is used during the OAuth 2.0 grant to acquire an
401 access token.
402
403 Returns:
404 bytes: The authorization grant assertion.
405 """
406 now = _helpers.utcnow()
407 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
408 expiry = now + lifetime
409
410 payload = {
411 "iat": _helpers.datetime_to_secs(now),
412 "exp": _helpers.datetime_to_secs(expiry),
413 # The issuer must be the service account email.
414 "iss": self._service_account_email,
415 # The audience must be the auth token endpoint's URI
416 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
417 "scope": _helpers.scopes_to_string(self._scopes or ()),
418 }
419
420 payload.update(self._additional_claims)
421
422 # The subject can be a user email for domain-wide delegation.
423 if self._subject:
424 payload.setdefault("sub", self._subject)
425
426 token = jwt.encode(self._signer, payload)
427
428 return token
429
430 def _use_self_signed_jwt(self):
431 # Since domain wide delegation doesn't work with self signed JWT. If
432 # subject exists, then we should not use self signed JWT.
433 return self._subject is None and self._jwt_credentials is not None
434
435 def _metric_header_for_usage(self):
436 if self._use_self_signed_jwt():
437 return metrics.CRED_TYPE_SA_JWT
438 return metrics.CRED_TYPE_SA_ASSERTION
439
440 @_helpers.copy_docstring(credentials.CredentialsWithRegionalAccessBoundary)
441 def _perform_refresh_token(self, request):
442 if self._always_use_jwt_access and not self._jwt_credentials:
443 # If self signed jwt should be used but jwt credential is not
444 # created, try to create one with scopes
445 self._create_self_signed_jwt(None)
446
447 if (
448 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
449 and self._subject
450 ):
451 raise exceptions.RefreshError(
452 "domain wide delegation is not supported for non-default universe domain"
453 )
454
455 if self._use_self_signed_jwt():
456 self._jwt_credentials.refresh(request)
457 self.token = self._jwt_credentials.token.decode()
458 self.expiry = self._jwt_credentials.expiry
459 else:
460 assertion = self._make_authorization_grant_assertion()
461 access_token, expiry, _ = _client.jwt_grant(
462 request, self._token_uri, assertion
463 )
464 self.token = access_token
465 self.expiry = expiry
466
467 def _create_self_signed_jwt(self, audience):
468 """Create a self-signed JWT from the credentials if requirements are met.
469
470 Args:
471 audience (str): The service URL. ``https://[API_ENDPOINT]/``
472 """
473 # https://google.aip.dev/auth/4111
474 if self._always_use_jwt_access:
475 if self._scopes:
476 additional_claims = {"scope": " ".join(self._scopes)}
477 if (
478 self._jwt_credentials is None
479 or self._jwt_credentials.additional_claims != additional_claims
480 ):
481 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
482 self, None, additional_claims=additional_claims
483 )
484 elif audience:
485 if (
486 self._jwt_credentials is None
487 or self._jwt_credentials._audience != audience
488 ):
489 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
490 self, audience
491 )
492 elif self._default_scopes:
493 additional_claims = {"scope": " ".join(self._default_scopes)}
494 if (
495 self._jwt_credentials is None
496 or additional_claims != self._jwt_credentials.additional_claims
497 ):
498 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
499 self, None, additional_claims=additional_claims
500 )
501 elif not self._scopes and audience:
502 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
503 self, audience
504 )
505
506 def _build_regional_access_boundary_lookup_url(
507 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821
508 ):
509 """Builds and returns the URL for the Regional Access Boundary lookup API.
510
511 This method constructs the specific URL for the IAM Credentials API's
512 `allowedLocations` endpoint, using the credential's universe domain
513 and service account email.
514
515 Returns:
516 Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None
517 if the service account email is missing.
518 """
519 if not self.service_account_email:
520 _LOGGER.error(
521 "Service account email is required to build the Regional Access Boundary lookup URL for service account credentials."
522 )
523 return None
524 return _regional_access_boundary_utils.get_service_account_rab_endpoint(
525 self._service_account_email
526 )
527
528 @_helpers.copy_docstring(credentials.Signing)
529 def sign_bytes(self, message):
530 return self._signer.sign(message)
531
532 @property # type: ignore
533 @_helpers.copy_docstring(credentials.Signing)
534 def signer(self):
535 return self._signer
536
537 @property # type: ignore
538 @_helpers.copy_docstring(credentials.Signing)
539 def signer_email(self):
540 return self._service_account_email
541
542 @_helpers.copy_docstring(credentials.Credentials)
543 def get_cred_info(self):
544 if self._cred_file_path:
545 return {
546 "credential_source": self._cred_file_path,
547 "credential_type": "service account credentials",
548 "principal": self.service_account_email,
549 }
550 return None
551
552
553class IDTokenCredentials(
554 credentials.Signing,
555 credentials.CredentialsWithQuotaProject,
556 credentials.CredentialsWithTokenUri,
557):
558 """Open ID Connect ID Token-based service account credentials.
559
560 These credentials are largely similar to :class:`.Credentials`, but instead
561 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
562 ID Connect ID Token as the bearer token. These credentials are useful when
563 communicating to services that require ID Tokens and can not accept access
564 tokens.
565
566 Usually, you'll create these credentials with one of the helper
567 constructors. To create credentials using a Google service account
568 private key JSON file::
569
570 credentials = (
571 service_account.IDTokenCredentials.from_service_account_file(
572 'service-account.json'))
573
574
575 Or if you already have the service account file loaded::
576
577 service_account_info = json.load(open('service_account.json'))
578 credentials = (
579 service_account.IDTokenCredentials.from_service_account_info(
580 service_account_info))
581
582
583 Both helper methods pass on arguments to the constructor, so you can
584 specify additional scopes and a subject if necessary::
585
586 credentials = (
587 service_account.IDTokenCredentials.from_service_account_file(
588 'service-account.json',
589 scopes=['email'],
590 subject='user@example.com'))
591
592
593 The credentials are considered immutable. If you want to modify the scopes
594 or the subject used for delegation, use :meth:`with_scopes` or
595 :meth:`with_subject`::
596
597 scoped_credentials = credentials.with_scopes(['email'])
598 delegated_credentials = credentials.with_subject(subject)
599
600 """
601
602 def __init__(
603 self,
604 signer,
605 service_account_email,
606 token_uri,
607 target_audience,
608 additional_claims=None,
609 quota_project_id=None,
610 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
611 ):
612 """
613 Args:
614 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
615 service_account_email (str): The service account's email.
616 token_uri (str): The OAuth 2.0 Token URI.
617 target_audience (str): The intended audience for these credentials,
618 used when requesting the ID Token. The ID Token's ``aud`` claim
619 will be set to this string.
620 additional_claims (Mapping[str, str]): Any additional claims for
621 the JWT assertion used in the authorization grant.
622 quota_project_id (Optional[str]): The project ID used for quota and billing.
623 universe_domain (str): The universe domain. The default
624 universe domain is googleapis.com. For default value IAM ID
625 token endponint is used for token refresh. Note that
626 iam.serviceAccountTokenCreator role is required to use the IAM
627 endpoint.
628
629 .. note:: Typically one of the helper constructors
630 :meth:`from_service_account_file` or
631 :meth:`from_service_account_info` are used instead of calling the
632 constructor directly.
633 """
634 super(IDTokenCredentials, self).__init__()
635 self._signer = signer
636 self._service_account_email = service_account_email
637 self._token_uri = token_uri
638 self._target_audience = target_audience
639 self._quota_project_id = quota_project_id
640 self._use_iam_endpoint = False
641
642 if not universe_domain:
643 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
644 else:
645 self._universe_domain = universe_domain
646 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
647 "googleapis.com", self._universe_domain
648 )
649
650 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
651 self._use_iam_endpoint = True
652
653 if additional_claims is not None:
654 self._additional_claims = additional_claims
655 else:
656 self._additional_claims = {}
657
658 @classmethod
659 def _from_signer_and_info(cls, signer, info, **kwargs):
660 """Creates a credentials instance from a signer and service account
661 info.
662
663 Args:
664 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
665 info (Mapping[str, str]): The service account info.
666 kwargs: Additional arguments to pass to the constructor.
667
668 Returns:
669 google.auth.jwt.IDTokenCredentials: The constructed credentials.
670
671 Raises:
672 ValueError: If the info is not in the expected format.
673 """
674 kwargs.setdefault("service_account_email", info["client_email"])
675 kwargs.setdefault("token_uri", info["token_uri"])
676 if "universe_domain" in info:
677 kwargs["universe_domain"] = info["universe_domain"]
678 return cls(signer, **kwargs)
679
680 @classmethod
681 def from_service_account_info(cls, info, **kwargs):
682 """Creates a credentials instance from parsed service account info.
683
684 Args:
685 info (Mapping[str, str]): The service account info in Google
686 format.
687 kwargs: Additional arguments to pass to the constructor.
688
689 Returns:
690 google.auth.service_account.IDTokenCredentials: The constructed
691 credentials.
692
693 Raises:
694 ValueError: If the info is not in the expected format.
695 """
696 signer = _service_account_info.from_dict(
697 info, require=["client_email", "token_uri"]
698 )
699 return cls._from_signer_and_info(signer, info, **kwargs)
700
701 @classmethod
702 def from_service_account_file(cls, filename, **kwargs):
703 """Creates a credentials instance from a service account json file.
704
705 Args:
706 filename (str): The path to the service account json file.
707 kwargs: Additional arguments to pass to the constructor.
708
709 Returns:
710 google.auth.service_account.IDTokenCredentials: The constructed
711 credentials.
712 """
713 info, signer = _service_account_info.from_filename(
714 filename, require=["client_email", "token_uri"]
715 )
716 return cls._from_signer_and_info(signer, info, **kwargs)
717
718 def _make_copy(self):
719 cred = self.__class__(
720 self._signer,
721 service_account_email=self._service_account_email,
722 token_uri=self._token_uri,
723 target_audience=self._target_audience,
724 additional_claims=self._additional_claims.copy(),
725 quota_project_id=self.quota_project_id,
726 universe_domain=self._universe_domain,
727 )
728 # _use_iam_endpoint is not exposed in the constructor
729 cred._use_iam_endpoint = self._use_iam_endpoint
730 return cred
731
732 def with_target_audience(self, target_audience):
733 """Create a copy of these credentials with the specified target
734 audience.
735
736 Args:
737 target_audience (str): The intended audience for these credentials,
738 used when requesting the ID Token.
739
740 Returns:
741 google.auth.service_account.IDTokenCredentials: A new credentials
742 instance.
743 """
744 cred = self._make_copy()
745 cred._target_audience = target_audience
746 return cred
747
748 def _with_use_iam_endpoint(self, use_iam_endpoint):
749 """Create a copy of these credentials with the use_iam_endpoint value.
750
751 Args:
752 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
753 be used instead of the token_uri. Note that
754 iam.serviceAccountTokenCreator role is required to use the IAM
755 endpoint. The default value is False. This feature is currently
756 experimental and subject to change without notice.
757
758 Returns:
759 google.auth.service_account.IDTokenCredentials: A new credentials
760 instance.
761 Raises:
762 google.auth.exceptions.InvalidValue: If the universe domain is not
763 default and use_iam_endpoint is False.
764 """
765 cred = self._make_copy()
766 if (
767 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
768 and not use_iam_endpoint
769 ):
770 raise exceptions.InvalidValue(
771 "use_iam_endpoint should be True for non-default universe domain"
772 )
773 cred._use_iam_endpoint = use_iam_endpoint
774 return cred
775
776 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
777 def with_quota_project(self, quota_project_id):
778 cred = self._make_copy()
779 cred._quota_project_id = quota_project_id
780 return cred
781
782 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
783 def with_token_uri(self, token_uri):
784 cred = self._make_copy()
785 cred._token_uri = token_uri
786 return cred
787
788 def _make_authorization_grant_assertion(self):
789 """Create the OAuth 2.0 assertion.
790
791 This assertion is used during the OAuth 2.0 grant to acquire an
792 ID token.
793
794 Returns:
795 bytes: The authorization grant assertion.
796 """
797 now = _helpers.utcnow()
798 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
799 expiry = now + lifetime
800
801 payload = {
802 "iat": _helpers.datetime_to_secs(now),
803 "exp": _helpers.datetime_to_secs(expiry),
804 # The issuer must be the service account email.
805 "iss": self.service_account_email,
806 # The audience must be the auth token endpoint's URI
807 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
808 # The target audience specifies which service the ID token is
809 # intended for.
810 "target_audience": self._target_audience,
811 }
812
813 payload.update(self._additional_claims)
814
815 token = jwt.encode(self._signer, payload)
816
817 return token
818
819 def _refresh_with_iam_endpoint(self, request):
820 """Use IAM generateIdToken endpoint to obtain an ID token.
821
822 It works as follows:
823
824 1. First we create a self signed jwt with
825 https://www.googleapis.com/auth/iam being the scope.
826
827 2. Next we use the self signed jwt as the access token, and make a POST
828 request to IAM generateIdToken endpoint. The request body is:
829 {
830 "audience": self._target_audience,
831 "includeEmail": "true",
832 "useEmailAzp": "true",
833 }
834
835 If the request is succesfully, it will return {"token":"the ID token"},
836 and we can extract the ID token and compute its expiry.
837 """
838 jwt_credentials = jwt.Credentials.from_signing_credentials(
839 self,
840 None,
841 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
842 )
843 jwt_credentials.refresh(request)
844
845 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
846 request,
847 self._iam_id_token_endpoint,
848 self.signer_email,
849 self._target_audience,
850 jwt_credentials.token.decode(),
851 self._universe_domain,
852 )
853
854 @_helpers.copy_docstring(credentials.Credentials)
855 def refresh(self, request):
856 if self._use_iam_endpoint:
857 self._refresh_with_iam_endpoint(request)
858 else:
859 assertion = self._make_authorization_grant_assertion()
860 access_token, expiry, _ = _client.id_token_jwt_grant(
861 request, self._token_uri, assertion
862 )
863 self.token = access_token
864 self.expiry = expiry
865
866 @property
867 def service_account_email(self):
868 """The service account email."""
869 return self._service_account_email
870
871 @_helpers.copy_docstring(credentials.Signing)
872 def sign_bytes(self, message):
873 return self._signer.sign(message)
874
875 @property # type: ignore
876 @_helpers.copy_docstring(credentials.Signing)
877 def signer(self):
878 return self._signer
879
880 @property # type: ignore
881 @_helpers.copy_docstring(credentials.Signing)
882 def signer_email(self):
883 return self._service_account_email