1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75
76from google.auth import _constants
77from google.auth import _helpers
78from google.auth import _service_account_info
79from google.auth import credentials
80from google.auth import exceptions
81from google.auth import iam
82from google.auth import jwt
83from google.auth import metrics
84from google.oauth2 import _client
85
86_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
87_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
88
89
90class Credentials(
91 credentials.Signing,
92 credentials.Scoped,
93 credentials.CredentialsWithQuotaProject,
94 credentials.CredentialsWithTokenUri,
95 credentials.CredentialsWithTrustBoundary,
96):
97 """Service account credentials
98
99 Usually, you'll create these credentials with one of the helper
100 constructors. To create credentials using a Google service account
101 private key JSON file::
102
103 credentials = service_account.Credentials.from_service_account_file(
104 'service-account.json')
105
106 Or if you already have the service account file loaded::
107
108 service_account_info = json.load(open('service_account.json'))
109 credentials = service_account.Credentials.from_service_account_info(
110 service_account_info)
111
112 Both helper methods pass on arguments to the constructor, so you can
113 specify additional scopes and a subject if necessary::
114
115 credentials = service_account.Credentials.from_service_account_file(
116 'service-account.json',
117 scopes=['email'],
118 subject='user@example.com')
119
120 The credentials are considered immutable. If you want to modify the scopes
121 or the subject used for delegation, use :meth:`with_scopes` or
122 :meth:`with_subject`::
123
124 scoped_credentials = credentials.with_scopes(['email'])
125 delegated_credentials = credentials.with_subject(subject)
126
127 To add a quota project, use :meth:`with_quota_project`::
128
129 credentials = credentials.with_quota_project('myproject-123')
130 """
131
132 def __init__(
133 self,
134 signer,
135 service_account_email,
136 token_uri,
137 scopes=None,
138 default_scopes=None,
139 subject=None,
140 project_id=None,
141 quota_project_id=None,
142 additional_claims=None,
143 always_use_jwt_access=False,
144 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
145 trust_boundary=None,
146 ):
147 """
148 Args:
149 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
150 service_account_email (str): The service account's email.
151 scopes (Sequence[str]): User-defined scopes to request during the
152 authorization grant.
153 default_scopes (Sequence[str]): Default scopes passed by a
154 Google client library. Use 'scopes' for user-defined scopes.
155 token_uri (str): The OAuth 2.0 Token URI.
156 subject (str): For domain-wide delegation, the email address of the
157 user to for which to request delegated access.
158 project_id (str): Project ID associated with the service account
159 credential.
160 quota_project_id (Optional[str]): The project ID used for quota and
161 billing.
162 additional_claims (Mapping[str, str]): Any additional claims for
163 the JWT assertion used in the authorization grant.
164 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
165 be always used.
166 universe_domain (str): The universe domain. The default
167 universe domain is googleapis.com. For default value self
168 signed jwt is used for token refresh.
169 trust_boundary (Mapping[str,str]): A credential trust boundary.
170
171 .. note:: Typically one of the helper constructors
172 :meth:`from_service_account_file` or
173 :meth:`from_service_account_info` are used instead of calling the
174 constructor directly.
175 """
176 super(Credentials, self).__init__()
177
178 self._cred_file_path = None
179 self._scopes = scopes
180 self._default_scopes = default_scopes
181 self._signer = signer
182 self._service_account_email = service_account_email
183 self._subject = subject
184 self._project_id = project_id
185 self._quota_project_id = quota_project_id
186 self._token_uri = token_uri
187 self._always_use_jwt_access = always_use_jwt_access
188 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
189
190 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
191 self._always_use_jwt_access = True
192
193 self._jwt_credentials = None
194
195 if additional_claims is not None:
196 self._additional_claims = additional_claims
197 else:
198 self._additional_claims = {}
199 self._trust_boundary = trust_boundary
200
201 @classmethod
202 def _from_signer_and_info(cls, signer, info, **kwargs):
203 """Creates a Credentials instance from a signer and service account
204 info.
205
206 Args:
207 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
208 info (Mapping[str, str]): The service account info.
209 kwargs: Additional arguments to pass to the constructor.
210
211 Returns:
212 google.auth.jwt.Credentials: The constructed credentials.
213
214 Raises:
215 ValueError: If the info is not in the expected format.
216 """
217 return cls(
218 signer,
219 service_account_email=info["client_email"],
220 token_uri=info["token_uri"],
221 project_id=info.get("project_id"),
222 universe_domain=info.get(
223 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
224 ),
225 trust_boundary=info.get("trust_boundary"),
226 **kwargs,
227 )
228
229 @classmethod
230 def from_service_account_info(cls, info, **kwargs):
231 """Creates a Credentials instance from parsed service account info.
232
233 Args:
234 info (Mapping[str, str]): The service account info in Google
235 format.
236 kwargs: Additional arguments to pass to the constructor.
237
238 Returns:
239 google.auth.service_account.Credentials: The constructed
240 credentials.
241
242 Raises:
243 ValueError: If the info is not in the expected format.
244 """
245 signer = _service_account_info.from_dict(
246 info, require=["client_email", "token_uri"]
247 )
248 return cls._from_signer_and_info(signer, info, **kwargs)
249
250 @classmethod
251 def from_service_account_file(cls, filename, **kwargs):
252 """Creates a Credentials instance from a service account json file.
253
254 Args:
255 filename (str): The path to the service account json file.
256 kwargs: Additional arguments to pass to the constructor.
257
258 Returns:
259 google.auth.service_account.Credentials: The constructed
260 credentials.
261 """
262 info, signer = _service_account_info.from_filename(
263 filename, require=["client_email", "token_uri"]
264 )
265 return cls._from_signer_and_info(signer, info, **kwargs)
266
267 @property
268 def service_account_email(self):
269 """The service account email."""
270 return self._service_account_email
271
272 @property
273 def project_id(self):
274 """Project ID associated with this credential."""
275 return self._project_id
276
277 @property
278 def requires_scopes(self):
279 """Checks if the credentials requires scopes.
280
281 Returns:
282 bool: True if there are no scopes set otherwise False.
283 """
284 return True if not self._scopes else False
285
286 def _make_copy(self):
287 cred = self.__class__(
288 self._signer,
289 service_account_email=self._service_account_email,
290 scopes=copy.copy(self._scopes),
291 default_scopes=copy.copy(self._default_scopes),
292 token_uri=self._token_uri,
293 subject=self._subject,
294 project_id=self._project_id,
295 quota_project_id=self._quota_project_id,
296 additional_claims=self._additional_claims.copy(),
297 always_use_jwt_access=self._always_use_jwt_access,
298 universe_domain=self._universe_domain,
299 trust_boundary=self._trust_boundary,
300 )
301 cred._cred_file_path = self._cred_file_path
302 return cred
303
304 @_helpers.copy_docstring(credentials.Scoped)
305 def with_scopes(self, scopes, default_scopes=None):
306 cred = self._make_copy()
307 cred._scopes = scopes
308 cred._default_scopes = default_scopes
309 return cred
310
311 def with_always_use_jwt_access(self, always_use_jwt_access):
312 """Create a copy of these credentials with the specified always_use_jwt_access value.
313
314 Args:
315 always_use_jwt_access (bool): Whether always use self signed JWT or not.
316
317 Returns:
318 google.auth.service_account.Credentials: A new credentials
319 instance.
320 Raises:
321 google.auth.exceptions.InvalidValue: If the universe domain is not
322 default and always_use_jwt_access is False.
323 """
324 cred = self._make_copy()
325 if (
326 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
327 and not always_use_jwt_access
328 ):
329 raise exceptions.InvalidValue(
330 "always_use_jwt_access should be True for non-default universe domain"
331 )
332 cred._always_use_jwt_access = always_use_jwt_access
333 return cred
334
335 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
336 def with_universe_domain(self, universe_domain):
337 cred = self._make_copy()
338 cred._universe_domain = universe_domain
339 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
340 cred._always_use_jwt_access = True
341 return cred
342
343 def with_subject(self, subject):
344 """Create a copy of these credentials with the specified subject.
345
346 Args:
347 subject (str): The subject claim.
348
349 Returns:
350 google.auth.service_account.Credentials: A new credentials
351 instance.
352 """
353 cred = self._make_copy()
354 cred._subject = subject
355 return cred
356
357 def with_claims(self, additional_claims):
358 """Returns a copy of these credentials with modified claims.
359
360 Args:
361 additional_claims (Mapping[str, str]): Any additional claims for
362 the JWT payload. This will be merged with the current
363 additional claims.
364
365 Returns:
366 google.auth.service_account.Credentials: A new credentials
367 instance.
368 """
369 new_additional_claims = copy.deepcopy(self._additional_claims)
370 new_additional_claims.update(additional_claims or {})
371 cred = self._make_copy()
372 cred._additional_claims = new_additional_claims
373 return cred
374
375 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
376 def with_quota_project(self, quota_project_id):
377 cred = self._make_copy()
378 cred._quota_project_id = quota_project_id
379 return cred
380
381 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
382 def with_token_uri(self, token_uri):
383 cred = self._make_copy()
384 cred._token_uri = token_uri
385 return cred
386
387 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
388 def with_trust_boundary(self, trust_boundary):
389 cred = self._make_copy()
390 cred._trust_boundary = trust_boundary
391 return cred
392
393 def _make_authorization_grant_assertion(self):
394 """Create the OAuth 2.0 assertion.
395
396 This assertion is used during the OAuth 2.0 grant to acquire an
397 access token.
398
399 Returns:
400 bytes: The authorization grant assertion.
401 """
402 now = _helpers.utcnow()
403 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
404 expiry = now + lifetime
405
406 payload = {
407 "iat": _helpers.datetime_to_secs(now),
408 "exp": _helpers.datetime_to_secs(expiry),
409 # The issuer must be the service account email.
410 "iss": self._service_account_email,
411 # The audience must be the auth token endpoint's URI
412 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
413 "scope": _helpers.scopes_to_string(self._scopes or ()),
414 }
415
416 payload.update(self._additional_claims)
417
418 # The subject can be a user email for domain-wide delegation.
419 if self._subject:
420 payload.setdefault("sub", self._subject)
421
422 token = jwt.encode(self._signer, payload)
423
424 return token
425
426 def _use_self_signed_jwt(self):
427 # Since domain wide delegation doesn't work with self signed JWT. If
428 # subject exists, then we should not use self signed JWT.
429 return self._subject is None and self._jwt_credentials is not None
430
431 def _metric_header_for_usage(self):
432 if self._use_self_signed_jwt():
433 return metrics.CRED_TYPE_SA_JWT
434 return metrics.CRED_TYPE_SA_ASSERTION
435
436 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
437 def _perform_refresh_token(self, request):
438 if self._always_use_jwt_access and not self._jwt_credentials:
439 # If self signed jwt should be used but jwt credential is not
440 # created, try to create one with scopes
441 self._create_self_signed_jwt(None)
442
443 if (
444 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
445 and self._subject
446 ):
447 raise exceptions.RefreshError(
448 "domain wide delegation is not supported for non-default universe domain"
449 )
450
451 if self._use_self_signed_jwt():
452 self._jwt_credentials.refresh(request)
453 self.token = self._jwt_credentials.token.decode()
454 self.expiry = self._jwt_credentials.expiry
455 else:
456 assertion = self._make_authorization_grant_assertion()
457 access_token, expiry, _ = _client.jwt_grant(
458 request, self._token_uri, assertion
459 )
460 self.token = access_token
461 self.expiry = expiry
462
463 def _create_self_signed_jwt(self, audience):
464 """Create a self-signed JWT from the credentials if requirements are met.
465
466 Args:
467 audience (str): The service URL. ``https://[API_ENDPOINT]/``
468 """
469 # https://google.aip.dev/auth/4111
470 if self._always_use_jwt_access:
471 if self._scopes:
472 additional_claims = {"scope": " ".join(self._scopes)}
473 if (
474 self._jwt_credentials is None
475 or self._jwt_credentials.additional_claims != additional_claims
476 ):
477 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
478 self, None, additional_claims=additional_claims
479 )
480 elif audience:
481 if (
482 self._jwt_credentials is None
483 or self._jwt_credentials._audience != audience
484 ):
485 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
486 self, audience
487 )
488 elif self._default_scopes:
489 additional_claims = {"scope": " ".join(self._default_scopes)}
490 if (
491 self._jwt_credentials is None
492 or additional_claims != self._jwt_credentials.additional_claims
493 ):
494 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
495 self, None, additional_claims=additional_claims
496 )
497 elif not self._scopes and audience:
498 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
499 self, audience
500 )
501
502 def _build_trust_boundary_lookup_url(self):
503 """Builds and returns the URL for the trust boundary lookup API.
504
505 This method constructs the specific URL for the IAM Credentials API's
506 `allowedLocations` endpoint, using the credential's universe domain
507 and service account email.
508
509 Raises:
510 ValueError: If `self.service_account_email` is None or an empty
511 string, as it's required to form the URL.
512
513 Returns:
514 str: The URL for the trust boundary lookup endpoint.
515 """
516 if not self.service_account_email:
517 raise ValueError(
518 "Service account email is required to build the trust boundary lookup URL."
519 )
520 return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
521 universe_domain=self._universe_domain,
522 service_account_email=self._service_account_email,
523 )
524
525 @_helpers.copy_docstring(credentials.Signing)
526 def sign_bytes(self, message):
527 return self._signer.sign(message)
528
529 @property # type: ignore
530 @_helpers.copy_docstring(credentials.Signing)
531 def signer(self):
532 return self._signer
533
534 @property # type: ignore
535 @_helpers.copy_docstring(credentials.Signing)
536 def signer_email(self):
537 return self._service_account_email
538
539 @_helpers.copy_docstring(credentials.Credentials)
540 def get_cred_info(self):
541 if self._cred_file_path:
542 return {
543 "credential_source": self._cred_file_path,
544 "credential_type": "service account credentials",
545 "principal": self.service_account_email,
546 }
547 return None
548
549
550class IDTokenCredentials(
551 credentials.Signing,
552 credentials.CredentialsWithQuotaProject,
553 credentials.CredentialsWithTokenUri,
554):
555 """Open ID Connect ID Token-based service account credentials.
556
557 These credentials are largely similar to :class:`.Credentials`, but instead
558 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
559 ID Connect ID Token as the bearer token. These credentials are useful when
560 communicating to services that require ID Tokens and can not accept access
561 tokens.
562
563 Usually, you'll create these credentials with one of the helper
564 constructors. To create credentials using a Google service account
565 private key JSON file::
566
567 credentials = (
568 service_account.IDTokenCredentials.from_service_account_file(
569 'service-account.json'))
570
571
572 Or if you already have the service account file loaded::
573
574 service_account_info = json.load(open('service_account.json'))
575 credentials = (
576 service_account.IDTokenCredentials.from_service_account_info(
577 service_account_info))
578
579
580 Both helper methods pass on arguments to the constructor, so you can
581 specify additional scopes and a subject if necessary::
582
583 credentials = (
584 service_account.IDTokenCredentials.from_service_account_file(
585 'service-account.json',
586 scopes=['email'],
587 subject='user@example.com'))
588
589
590 The credentials are considered immutable. If you want to modify the scopes
591 or the subject used for delegation, use :meth:`with_scopes` or
592 :meth:`with_subject`::
593
594 scoped_credentials = credentials.with_scopes(['email'])
595 delegated_credentials = credentials.with_subject(subject)
596
597 """
598
599 def __init__(
600 self,
601 signer,
602 service_account_email,
603 token_uri,
604 target_audience,
605 additional_claims=None,
606 quota_project_id=None,
607 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
608 ):
609 """
610 Args:
611 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
612 service_account_email (str): The service account's email.
613 token_uri (str): The OAuth 2.0 Token URI.
614 target_audience (str): The intended audience for these credentials,
615 used when requesting the ID Token. The ID Token's ``aud`` claim
616 will be set to this string.
617 additional_claims (Mapping[str, str]): Any additional claims for
618 the JWT assertion used in the authorization grant.
619 quota_project_id (Optional[str]): The project ID used for quota and billing.
620 universe_domain (str): The universe domain. The default
621 universe domain is googleapis.com. For default value IAM ID
622 token endponint is used for token refresh. Note that
623 iam.serviceAccountTokenCreator role is required to use the IAM
624 endpoint.
625
626 .. note:: Typically one of the helper constructors
627 :meth:`from_service_account_file` or
628 :meth:`from_service_account_info` are used instead of calling the
629 constructor directly.
630 """
631 super(IDTokenCredentials, self).__init__()
632 self._signer = signer
633 self._service_account_email = service_account_email
634 self._token_uri = token_uri
635 self._target_audience = target_audience
636 self._quota_project_id = quota_project_id
637 self._use_iam_endpoint = False
638
639 if not universe_domain:
640 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
641 else:
642 self._universe_domain = universe_domain
643 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
644 "googleapis.com", self._universe_domain
645 )
646
647 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
648 self._use_iam_endpoint = True
649
650 if additional_claims is not None:
651 self._additional_claims = additional_claims
652 else:
653 self._additional_claims = {}
654
655 @classmethod
656 def _from_signer_and_info(cls, signer, info, **kwargs):
657 """Creates a credentials instance from a signer and service account
658 info.
659
660 Args:
661 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
662 info (Mapping[str, str]): The service account info.
663 kwargs: Additional arguments to pass to the constructor.
664
665 Returns:
666 google.auth.jwt.IDTokenCredentials: The constructed credentials.
667
668 Raises:
669 ValueError: If the info is not in the expected format.
670 """
671 kwargs.setdefault("service_account_email", info["client_email"])
672 kwargs.setdefault("token_uri", info["token_uri"])
673 if "universe_domain" in info:
674 kwargs["universe_domain"] = info["universe_domain"]
675 return cls(signer, **kwargs)
676
677 @classmethod
678 def from_service_account_info(cls, info, **kwargs):
679 """Creates a credentials instance from parsed service account info.
680
681 Args:
682 info (Mapping[str, str]): The service account info in Google
683 format.
684 kwargs: Additional arguments to pass to the constructor.
685
686 Returns:
687 google.auth.service_account.IDTokenCredentials: The constructed
688 credentials.
689
690 Raises:
691 ValueError: If the info is not in the expected format.
692 """
693 signer = _service_account_info.from_dict(
694 info, require=["client_email", "token_uri"]
695 )
696 return cls._from_signer_and_info(signer, info, **kwargs)
697
698 @classmethod
699 def from_service_account_file(cls, filename, **kwargs):
700 """Creates a credentials instance from a service account json file.
701
702 Args:
703 filename (str): The path to the service account json file.
704 kwargs: Additional arguments to pass to the constructor.
705
706 Returns:
707 google.auth.service_account.IDTokenCredentials: The constructed
708 credentials.
709 """
710 info, signer = _service_account_info.from_filename(
711 filename, require=["client_email", "token_uri"]
712 )
713 return cls._from_signer_and_info(signer, info, **kwargs)
714
715 def _make_copy(self):
716 cred = self.__class__(
717 self._signer,
718 service_account_email=self._service_account_email,
719 token_uri=self._token_uri,
720 target_audience=self._target_audience,
721 additional_claims=self._additional_claims.copy(),
722 quota_project_id=self.quota_project_id,
723 universe_domain=self._universe_domain,
724 )
725 # _use_iam_endpoint is not exposed in the constructor
726 cred._use_iam_endpoint = self._use_iam_endpoint
727 return cred
728
729 def with_target_audience(self, target_audience):
730 """Create a copy of these credentials with the specified target
731 audience.
732
733 Args:
734 target_audience (str): The intended audience for these credentials,
735 used when requesting the ID Token.
736
737 Returns:
738 google.auth.service_account.IDTokenCredentials: A new credentials
739 instance.
740 """
741 cred = self._make_copy()
742 cred._target_audience = target_audience
743 return cred
744
745 def _with_use_iam_endpoint(self, use_iam_endpoint):
746 """Create a copy of these credentials with the use_iam_endpoint value.
747
748 Args:
749 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
750 be used instead of the token_uri. Note that
751 iam.serviceAccountTokenCreator role is required to use the IAM
752 endpoint. The default value is False. This feature is currently
753 experimental and subject to change without notice.
754
755 Returns:
756 google.auth.service_account.IDTokenCredentials: A new credentials
757 instance.
758 Raises:
759 google.auth.exceptions.InvalidValue: If the universe domain is not
760 default and use_iam_endpoint is False.
761 """
762 cred = self._make_copy()
763 if (
764 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
765 and not use_iam_endpoint
766 ):
767 raise exceptions.InvalidValue(
768 "use_iam_endpoint should be True for non-default universe domain"
769 )
770 cred._use_iam_endpoint = use_iam_endpoint
771 return cred
772
773 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
774 def with_quota_project(self, quota_project_id):
775 cred = self._make_copy()
776 cred._quota_project_id = quota_project_id
777 return cred
778
779 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
780 def with_token_uri(self, token_uri):
781 cred = self._make_copy()
782 cred._token_uri = token_uri
783 return cred
784
785 def _make_authorization_grant_assertion(self):
786 """Create the OAuth 2.0 assertion.
787
788 This assertion is used during the OAuth 2.0 grant to acquire an
789 ID token.
790
791 Returns:
792 bytes: The authorization grant assertion.
793 """
794 now = _helpers.utcnow()
795 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
796 expiry = now + lifetime
797
798 payload = {
799 "iat": _helpers.datetime_to_secs(now),
800 "exp": _helpers.datetime_to_secs(expiry),
801 # The issuer must be the service account email.
802 "iss": self.service_account_email,
803 # The audience must be the auth token endpoint's URI
804 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
805 # The target audience specifies which service the ID token is
806 # intended for.
807 "target_audience": self._target_audience,
808 }
809
810 payload.update(self._additional_claims)
811
812 token = jwt.encode(self._signer, payload)
813
814 return token
815
816 def _refresh_with_iam_endpoint(self, request):
817 """Use IAM generateIdToken endpoint to obtain an ID token.
818
819 It works as follows:
820
821 1. First we create a self signed jwt with
822 https://www.googleapis.com/auth/iam being the scope.
823
824 2. Next we use the self signed jwt as the access token, and make a POST
825 request to IAM generateIdToken endpoint. The request body is:
826 {
827 "audience": self._target_audience,
828 "includeEmail": "true",
829 "useEmailAzp": "true",
830 }
831
832 If the request is succesfully, it will return {"token":"the ID token"},
833 and we can extract the ID token and compute its expiry.
834 """
835 jwt_credentials = jwt.Credentials.from_signing_credentials(
836 self,
837 None,
838 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
839 )
840 jwt_credentials.refresh(request)
841
842 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
843 request,
844 self._iam_id_token_endpoint,
845 self.signer_email,
846 self._target_audience,
847 jwt_credentials.token.decode(),
848 self._universe_domain,
849 )
850
851 @_helpers.copy_docstring(credentials.Credentials)
852 def refresh(self, request):
853 if self._use_iam_endpoint:
854 self._refresh_with_iam_endpoint(request)
855 else:
856 assertion = self._make_authorization_grant_assertion()
857 access_token, expiry, _ = _client.id_token_jwt_grant(
858 request, self._token_uri, assertion
859 )
860 self.token = access_token
861 self.expiry = expiry
862
863 @property
864 def service_account_email(self):
865 """The service account email."""
866 return self._service_account_email
867
868 @_helpers.copy_docstring(credentials.Signing)
869 def sign_bytes(self, message):
870 return self._signer.sign(message)
871
872 @property # type: ignore
873 @_helpers.copy_docstring(credentials.Signing)
874 def signer(self):
875 return self._signer
876
877 @property # type: ignore
878 @_helpers.copy_docstring(credentials.Signing)
879 def signer_email(self):
880 return self._service_account_email