1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75
76from google.auth import _constants
77from google.auth import _helpers
78from google.auth import _service_account_info
79from google.auth import credentials
80from google.auth import exceptions
81from google.auth import iam
82from google.auth import jwt
83from google.auth import metrics
84from google.oauth2 import _client
85
86_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
87_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
88
89
90class Credentials(
91 credentials.Signing,
92 credentials.Scoped,
93 credentials.CredentialsWithQuotaProject,
94 credentials.CredentialsWithTokenUri,
95 credentials.CredentialsWithTrustBoundary,
96):
97 """Service account credentials
98
99 Usually, you'll create these credentials with one of the helper
100 constructors. To create credentials using a Google service account
101 private key JSON file::
102
103 credentials = service_account.Credentials.from_service_account_file(
104 'service-account.json')
105
106 Or if you already have the service account file loaded::
107
108 service_account_info = json.load(open('service_account.json'))
109 credentials = service_account.Credentials.from_service_account_info(
110 service_account_info)
111
112 Both helper methods pass on arguments to the constructor, so you can
113 specify additional scopes and a subject if necessary::
114
115 credentials = service_account.Credentials.from_service_account_file(
116 'service-account.json',
117 scopes=['email'],
118 subject='user@example.com')
119
120 The credentials are considered immutable. If you want to modify the scopes
121 or the subject used for delegation, use :meth:`with_scopes` or
122 :meth:`with_subject`::
123
124 scoped_credentials = credentials.with_scopes(['email'])
125 delegated_credentials = credentials.with_subject(subject)
126
127 To add a quota project, use :meth:`with_quota_project`::
128
129 credentials = credentials.with_quota_project('myproject-123')
130 """
131
132 def __init__(
133 self,
134 signer,
135 service_account_email,
136 token_uri,
137 scopes=None,
138 default_scopes=None,
139 subject=None,
140 project_id=None,
141 quota_project_id=None,
142 additional_claims=None,
143 always_use_jwt_access=False,
144 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
145 trust_boundary=None,
146 ):
147 """
148 Args:
149 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
150 service_account_email (str): The service account's email.
151 scopes (Sequence[str]): User-defined scopes to request during the
152 authorization grant.
153 default_scopes (Sequence[str]): Default scopes passed by a
154 Google client library. Use 'scopes' for user-defined scopes.
155 token_uri (str): The OAuth 2.0 Token URI.
156 subject (str): For domain-wide delegation, the email address of the
157 user to for which to request delegated access.
158 project_id (str): Project ID associated with the service account
159 credential.
160 quota_project_id (Optional[str]): The project ID used for quota and
161 billing.
162 additional_claims (Mapping[str, str]): Any additional claims for
163 the JWT assertion used in the authorization grant.
164 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
165 be always used.
166 universe_domain (str): The universe domain. The default
167 universe domain is googleapis.com. For default value self
168 signed jwt is used for token refresh.
169 trust_boundary (Mapping[str,str]): A credential trust boundary.
170
171 .. note:: Typically one of the helper constructors
172 :meth:`from_service_account_file` or
173 :meth:`from_service_account_info` are used instead of calling the
174 constructor directly.
175 """
176 super(Credentials, self).__init__()
177
178 self._cred_file_path = None
179 self._scopes = scopes
180 self._default_scopes = default_scopes
181 self._signer = signer
182 self._service_account_email = service_account_email
183 self._subject = subject
184 self._project_id = project_id
185 self._quota_project_id = quota_project_id
186 self._token_uri = token_uri
187 self._always_use_jwt_access = always_use_jwt_access
188 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
189
190 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
191 self._always_use_jwt_access = True
192
193 self._jwt_credentials = None
194
195 if additional_claims is not None:
196 self._additional_claims = additional_claims
197 else:
198 self._additional_claims = {}
199 self._trust_boundary = trust_boundary
200
201 @classmethod
202 def _from_signer_and_info(cls, signer, info, **kwargs):
203 """Creates a Credentials instance from a signer and service account
204 info.
205
206 Args:
207 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
208 info (Mapping[str, str]): The service account info.
209 kwargs: Additional arguments to pass to the constructor.
210
211 Returns:
212 google.auth.jwt.Credentials: The constructed credentials.
213
214 Raises:
215 ValueError: If the info is not in the expected format.
216 """
217 return cls(
218 signer,
219 service_account_email=info["client_email"],
220 token_uri=info["token_uri"],
221 project_id=info.get("project_id"),
222 universe_domain=info.get(
223 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
224 ),
225 trust_boundary=info.get("trust_boundary"),
226 **kwargs,
227 )
228
229 @classmethod
230 def from_service_account_info(cls, info, **kwargs):
231 """Creates a Credentials instance from parsed service account info.
232
233 Args:
234 info (Mapping[str, str]): The service account info in Google
235 format.
236 kwargs: Additional arguments to pass to the constructor.
237
238 Returns:
239 google.auth.service_account.Credentials: The constructed
240 credentials.
241
242 Raises:
243 ValueError: If the info is not in the expected format.
244 """
245 signer = _service_account_info.from_dict(
246 info, require=["client_email", "token_uri"]
247 )
248 return cls._from_signer_and_info(signer, info, **kwargs)
249
250 @classmethod
251 def from_service_account_file(cls, filename, **kwargs):
252 """Creates a Credentials instance from a service account json file.
253
254 Args:
255 filename (str): The path to the service account json file.
256 kwargs: Additional arguments to pass to the constructor.
257
258 Returns:
259 google.auth.service_account.Credentials: The constructed
260 credentials.
261 """
262 info, signer = _service_account_info.from_filename(
263 filename, require=["client_email", "token_uri"]
264 )
265 return cls._from_signer_and_info(signer, info, **kwargs)
266
267 @property
268 def service_account_email(self):
269 """The service account email."""
270 return self._service_account_email
271
272 @property
273 def project_id(self):
274 """Project ID associated with this credential."""
275 return self._project_id
276
277 @property
278 def requires_scopes(self):
279 """Checks if the credentials requires scopes.
280
281 Returns:
282 bool: True if there are no scopes set otherwise False.
283 """
284 return True if not self._scopes else False
285
286 def _make_copy(self):
287 cred = self.__class__(
288 self._signer,
289 service_account_email=self._service_account_email,
290 scopes=copy.copy(self._scopes),
291 default_scopes=copy.copy(self._default_scopes),
292 token_uri=self._token_uri,
293 subject=self._subject,
294 project_id=self._project_id,
295 quota_project_id=self._quota_project_id,
296 additional_claims=self._additional_claims.copy(),
297 always_use_jwt_access=self._always_use_jwt_access,
298 universe_domain=self._universe_domain,
299 trust_boundary=self._trust_boundary,
300 )
301 cred._cred_file_path = self._cred_file_path
302 return cred
303
304 @_helpers.copy_docstring(credentials.Scoped)
305 def with_scopes(self, scopes, default_scopes=None):
306 cred = self._make_copy()
307 cred._scopes = scopes
308 cred._default_scopes = default_scopes
309 return cred
310
311 def with_always_use_jwt_access(self, always_use_jwt_access):
312 """Create a copy of these credentials with the specified always_use_jwt_access value.
313
314 Args:
315 always_use_jwt_access (bool): Whether always use self signed JWT or not.
316
317 Returns:
318 google.auth.service_account.Credentials: A new credentials
319 instance.
320 Raises:
321 google.auth.exceptions.InvalidValue: If the universe domain is not
322 default and always_use_jwt_access is False.
323 """
324 cred = self._make_copy()
325 if (
326 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
327 and not always_use_jwt_access
328 ):
329 raise exceptions.InvalidValue(
330 "always_use_jwt_access should be True for non-default universe domain"
331 )
332 cred._always_use_jwt_access = always_use_jwt_access
333 return cred
334
335 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
336 def with_universe_domain(self, universe_domain):
337 cred = self._make_copy()
338 cred._universe_domain = universe_domain
339 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
340 cred._always_use_jwt_access = True
341 return cred
342
343 def with_subject(self, subject):
344 """Create a copy of these credentials with the specified subject.
345
346 Args:
347 subject (str): The subject claim.
348
349 Returns:
350 google.auth.service_account.Credentials: A new credentials
351 instance.
352 """
353 cred = self._make_copy()
354 cred._subject = subject
355 return cred
356
357 def with_claims(self, additional_claims):
358 """Returns a copy of these credentials with modified claims.
359
360 Args:
361 additional_claims (Mapping[str, str]): Any additional claims for
362 the JWT payload. This will be merged with the current
363 additional claims.
364
365 Returns:
366 google.auth.service_account.Credentials: A new credentials
367 instance.
368 """
369 new_additional_claims = copy.deepcopy(self._additional_claims)
370 new_additional_claims.update(additional_claims or {})
371 cred = self._make_copy()
372 cred._additional_claims = new_additional_claims
373 return cred
374
375 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
376 def with_quota_project(self, quota_project_id):
377 cred = self._make_copy()
378 cred._quota_project_id = quota_project_id
379 return cred
380
381 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
382 def with_token_uri(self, token_uri):
383 cred = self._make_copy()
384 cred._token_uri = token_uri
385 return cred
386
387 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
388 def with_trust_boundary(self, trust_boundary):
389 cred = self._make_copy()
390 cred._trust_boundary = trust_boundary
391 return cred
392
393 def _make_authorization_grant_assertion(self):
394 """Create the OAuth 2.0 assertion.
395
396 This assertion is used during the OAuth 2.0 grant to acquire an
397 access token.
398
399 Returns:
400 bytes: The authorization grant assertion.
401 """
402 now = _helpers.utcnow()
403 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
404 expiry = now + lifetime
405
406 payload = {
407 "iat": _helpers.datetime_to_secs(now),
408 "exp": _helpers.datetime_to_secs(expiry),
409 # The issuer must be the service account email.
410 "iss": self._service_account_email,
411 # The audience must be the auth token endpoint's URI
412 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
413 "scope": _helpers.scopes_to_string(self._scopes or ()),
414 }
415
416 payload.update(self._additional_claims)
417
418 # The subject can be a user email for domain-wide delegation.
419 if self._subject:
420 payload.setdefault("sub", self._subject)
421
422 token = jwt.encode(self._signer, payload)
423
424 return token
425
426 def _use_self_signed_jwt(self):
427 # Since domain wide delegation doesn't work with self signed JWT. If
428 # subject exists, then we should not use self signed JWT.
429 return self._subject is None and self._jwt_credentials is not None
430
431 def _metric_header_for_usage(self):
432 if self._use_self_signed_jwt():
433 return metrics.CRED_TYPE_SA_JWT
434 return metrics.CRED_TYPE_SA_ASSERTION
435
436 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
437 def _refresh_token(self, request):
438 if self._always_use_jwt_access and not self._jwt_credentials:
439 # If self signed jwt should be used but jwt credential is not
440 # created, try to create one with scopes
441 self._create_self_signed_jwt(None)
442
443 if (
444 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
445 and self._subject
446 ):
447 raise exceptions.RefreshError(
448 "domain wide delegation is not supported for non-default universe domain"
449 )
450
451 if self._use_self_signed_jwt():
452 self._jwt_credentials.refresh(request)
453 self.token = self._jwt_credentials.token.decode()
454 self.expiry = self._jwt_credentials.expiry
455 else:
456 assertion = self._make_authorization_grant_assertion()
457 access_token, expiry, _ = _client.jwt_grant(
458 request, self._token_uri, assertion
459 )
460 self.token = access_token
461 self.expiry = expiry
462
463 def _create_self_signed_jwt(self, audience):
464 """Create a self-signed JWT from the credentials if requirements are met.
465
466 Args:
467 audience (str): The service URL. ``https://[API_ENDPOINT]/``
468 """
469 # https://google.aip.dev/auth/4111
470 if self._always_use_jwt_access:
471 if self._scopes:
472 additional_claims = {"scope": " ".join(self._scopes)}
473 if (
474 self._jwt_credentials is None
475 or self._jwt_credentials.additional_claims != additional_claims
476 ):
477 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
478 self, None, additional_claims=additional_claims
479 )
480 elif audience:
481 if (
482 self._jwt_credentials is None
483 or self._jwt_credentials._audience != audience
484 ):
485
486 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
487 self, audience
488 )
489 elif self._default_scopes:
490 additional_claims = {"scope": " ".join(self._default_scopes)}
491 if (
492 self._jwt_credentials is None
493 or additional_claims != self._jwt_credentials.additional_claims
494 ):
495 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
496 self, None, additional_claims=additional_claims
497 )
498 elif not self._scopes and audience:
499 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
500 self, audience
501 )
502
503 def _build_trust_boundary_lookup_url(self):
504 """Builds and returns the URL for the trust boundary lookup API.
505
506 This method constructs the specific URL for the IAM Credentials API's
507 `allowedLocations` endpoint, using the credential's universe domain
508 and service account email.
509
510 Raises:
511 ValueError: If `self.service_account_email` is None or an empty
512 string, as it's required to form the URL.
513
514 Returns:
515 str: The URL for the trust boundary lookup endpoint.
516 """
517 if not self.service_account_email:
518 raise ValueError(
519 "Service account email is required to build the trust boundary lookup URL."
520 )
521 return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
522 universe_domain=self._universe_domain,
523 service_account_email=self._service_account_email,
524 )
525
526 @_helpers.copy_docstring(credentials.Signing)
527 def sign_bytes(self, message):
528 return self._signer.sign(message)
529
530 @property # type: ignore
531 @_helpers.copy_docstring(credentials.Signing)
532 def signer(self):
533 return self._signer
534
535 @property # type: ignore
536 @_helpers.copy_docstring(credentials.Signing)
537 def signer_email(self):
538 return self._service_account_email
539
540 @_helpers.copy_docstring(credentials.Credentials)
541 def get_cred_info(self):
542 if self._cred_file_path:
543 return {
544 "credential_source": self._cred_file_path,
545 "credential_type": "service account credentials",
546 "principal": self.service_account_email,
547 }
548 return None
549
550
551class IDTokenCredentials(
552 credentials.Signing,
553 credentials.CredentialsWithQuotaProject,
554 credentials.CredentialsWithTokenUri,
555):
556 """Open ID Connect ID Token-based service account credentials.
557
558 These credentials are largely similar to :class:`.Credentials`, but instead
559 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
560 ID Connect ID Token as the bearer token. These credentials are useful when
561 communicating to services that require ID Tokens and can not accept access
562 tokens.
563
564 Usually, you'll create these credentials with one of the helper
565 constructors. To create credentials using a Google service account
566 private key JSON file::
567
568 credentials = (
569 service_account.IDTokenCredentials.from_service_account_file(
570 'service-account.json'))
571
572
573 Or if you already have the service account file loaded::
574
575 service_account_info = json.load(open('service_account.json'))
576 credentials = (
577 service_account.IDTokenCredentials.from_service_account_info(
578 service_account_info))
579
580
581 Both helper methods pass on arguments to the constructor, so you can
582 specify additional scopes and a subject if necessary::
583
584 credentials = (
585 service_account.IDTokenCredentials.from_service_account_file(
586 'service-account.json',
587 scopes=['email'],
588 subject='user@example.com'))
589
590
591 The credentials are considered immutable. If you want to modify the scopes
592 or the subject used for delegation, use :meth:`with_scopes` or
593 :meth:`with_subject`::
594
595 scoped_credentials = credentials.with_scopes(['email'])
596 delegated_credentials = credentials.with_subject(subject)
597
598 """
599
600 def __init__(
601 self,
602 signer,
603 service_account_email,
604 token_uri,
605 target_audience,
606 additional_claims=None,
607 quota_project_id=None,
608 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
609 ):
610 """
611 Args:
612 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
613 service_account_email (str): The service account's email.
614 token_uri (str): The OAuth 2.0 Token URI.
615 target_audience (str): The intended audience for these credentials,
616 used when requesting the ID Token. The ID Token's ``aud`` claim
617 will be set to this string.
618 additional_claims (Mapping[str, str]): Any additional claims for
619 the JWT assertion used in the authorization grant.
620 quota_project_id (Optional[str]): The project ID used for quota and billing.
621 universe_domain (str): The universe domain. The default
622 universe domain is googleapis.com. For default value IAM ID
623 token endponint is used for token refresh. Note that
624 iam.serviceAccountTokenCreator role is required to use the IAM
625 endpoint.
626
627 .. note:: Typically one of the helper constructors
628 :meth:`from_service_account_file` or
629 :meth:`from_service_account_info` are used instead of calling the
630 constructor directly.
631 """
632 super(IDTokenCredentials, self).__init__()
633 self._signer = signer
634 self._service_account_email = service_account_email
635 self._token_uri = token_uri
636 self._target_audience = target_audience
637 self._quota_project_id = quota_project_id
638 self._use_iam_endpoint = False
639
640 if not universe_domain:
641 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
642 else:
643 self._universe_domain = universe_domain
644 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
645 "googleapis.com", self._universe_domain
646 )
647
648 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
649 self._use_iam_endpoint = True
650
651 if additional_claims is not None:
652 self._additional_claims = additional_claims
653 else:
654 self._additional_claims = {}
655
656 @classmethod
657 def _from_signer_and_info(cls, signer, info, **kwargs):
658 """Creates a credentials instance from a signer and service account
659 info.
660
661 Args:
662 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
663 info (Mapping[str, str]): The service account info.
664 kwargs: Additional arguments to pass to the constructor.
665
666 Returns:
667 google.auth.jwt.IDTokenCredentials: The constructed credentials.
668
669 Raises:
670 ValueError: If the info is not in the expected format.
671 """
672 kwargs.setdefault("service_account_email", info["client_email"])
673 kwargs.setdefault("token_uri", info["token_uri"])
674 if "universe_domain" in info:
675 kwargs["universe_domain"] = info["universe_domain"]
676 return cls(signer, **kwargs)
677
678 @classmethod
679 def from_service_account_info(cls, info, **kwargs):
680 """Creates a credentials instance from parsed service account info.
681
682 Args:
683 info (Mapping[str, str]): The service account info in Google
684 format.
685 kwargs: Additional arguments to pass to the constructor.
686
687 Returns:
688 google.auth.service_account.IDTokenCredentials: The constructed
689 credentials.
690
691 Raises:
692 ValueError: If the info is not in the expected format.
693 """
694 signer = _service_account_info.from_dict(
695 info, require=["client_email", "token_uri"]
696 )
697 return cls._from_signer_and_info(signer, info, **kwargs)
698
699 @classmethod
700 def from_service_account_file(cls, filename, **kwargs):
701 """Creates a credentials instance from a service account json file.
702
703 Args:
704 filename (str): The path to the service account json file.
705 kwargs: Additional arguments to pass to the constructor.
706
707 Returns:
708 google.auth.service_account.IDTokenCredentials: The constructed
709 credentials.
710 """
711 info, signer = _service_account_info.from_filename(
712 filename, require=["client_email", "token_uri"]
713 )
714 return cls._from_signer_and_info(signer, info, **kwargs)
715
716 def _make_copy(self):
717 cred = self.__class__(
718 self._signer,
719 service_account_email=self._service_account_email,
720 token_uri=self._token_uri,
721 target_audience=self._target_audience,
722 additional_claims=self._additional_claims.copy(),
723 quota_project_id=self.quota_project_id,
724 universe_domain=self._universe_domain,
725 )
726 # _use_iam_endpoint is not exposed in the constructor
727 cred._use_iam_endpoint = self._use_iam_endpoint
728 return cred
729
730 def with_target_audience(self, target_audience):
731 """Create a copy of these credentials with the specified target
732 audience.
733
734 Args:
735 target_audience (str): The intended audience for these credentials,
736 used when requesting the ID Token.
737
738 Returns:
739 google.auth.service_account.IDTokenCredentials: A new credentials
740 instance.
741 """
742 cred = self._make_copy()
743 cred._target_audience = target_audience
744 return cred
745
746 def _with_use_iam_endpoint(self, use_iam_endpoint):
747 """Create a copy of these credentials with the use_iam_endpoint value.
748
749 Args:
750 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
751 be used instead of the token_uri. Note that
752 iam.serviceAccountTokenCreator role is required to use the IAM
753 endpoint. The default value is False. This feature is currently
754 experimental and subject to change without notice.
755
756 Returns:
757 google.auth.service_account.IDTokenCredentials: A new credentials
758 instance.
759 Raises:
760 google.auth.exceptions.InvalidValue: If the universe domain is not
761 default and use_iam_endpoint is False.
762 """
763 cred = self._make_copy()
764 if (
765 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
766 and not use_iam_endpoint
767 ):
768 raise exceptions.InvalidValue(
769 "use_iam_endpoint should be True for non-default universe domain"
770 )
771 cred._use_iam_endpoint = use_iam_endpoint
772 return cred
773
774 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
775 def with_quota_project(self, quota_project_id):
776 cred = self._make_copy()
777 cred._quota_project_id = quota_project_id
778 return cred
779
780 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
781 def with_token_uri(self, token_uri):
782 cred = self._make_copy()
783 cred._token_uri = token_uri
784 return cred
785
786 def _make_authorization_grant_assertion(self):
787 """Create the OAuth 2.0 assertion.
788
789 This assertion is used during the OAuth 2.0 grant to acquire an
790 ID token.
791
792 Returns:
793 bytes: The authorization grant assertion.
794 """
795 now = _helpers.utcnow()
796 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
797 expiry = now + lifetime
798
799 payload = {
800 "iat": _helpers.datetime_to_secs(now),
801 "exp": _helpers.datetime_to_secs(expiry),
802 # The issuer must be the service account email.
803 "iss": self.service_account_email,
804 # The audience must be the auth token endpoint's URI
805 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
806 # The target audience specifies which service the ID token is
807 # intended for.
808 "target_audience": self._target_audience,
809 }
810
811 payload.update(self._additional_claims)
812
813 token = jwt.encode(self._signer, payload)
814
815 return token
816
817 def _refresh_with_iam_endpoint(self, request):
818 """Use IAM generateIdToken endpoint to obtain an ID token.
819
820 It works as follows:
821
822 1. First we create a self signed jwt with
823 https://www.googleapis.com/auth/iam being the scope.
824
825 2. Next we use the self signed jwt as the access token, and make a POST
826 request to IAM generateIdToken endpoint. The request body is:
827 {
828 "audience": self._target_audience,
829 "includeEmail": "true",
830 "useEmailAzp": "true",
831 }
832
833 If the request is succesfully, it will return {"token":"the ID token"},
834 and we can extract the ID token and compute its expiry.
835 """
836 jwt_credentials = jwt.Credentials.from_signing_credentials(
837 self,
838 None,
839 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
840 )
841 jwt_credentials.refresh(request)
842
843 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
844 request,
845 self._iam_id_token_endpoint,
846 self.signer_email,
847 self._target_audience,
848 jwt_credentials.token.decode(),
849 self._universe_domain,
850 )
851
852 @_helpers.copy_docstring(credentials.Credentials)
853 def refresh(self, request):
854 if self._use_iam_endpoint:
855 self._refresh_with_iam_endpoint(request)
856 else:
857 assertion = self._make_authorization_grant_assertion()
858 access_token, expiry, _ = _client.id_token_jwt_grant(
859 request, self._token_uri, assertion
860 )
861 self.token = access_token
862 self.expiry = expiry
863
864 @property
865 def service_account_email(self):
866 """The service account email."""
867 return self._service_account_email
868
869 @_helpers.copy_docstring(credentials.Signing)
870 def sign_bytes(self, message):
871 return self._signer.sign(message)
872
873 @property # type: ignore
874 @_helpers.copy_docstring(credentials.Signing)
875 def signer(self):
876 return self._signer
877
878 @property # type: ignore
879 @_helpers.copy_docstring(credentials.Signing)
880 def signer_email(self):
881 return self._service_account_email