Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/oauth2/service_account.py: 34%
236 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
54Domain-wide delegation
55----------------------
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
73import copy
74import datetime
76from google.auth import _helpers
77from google.auth import _service_account_info
78from google.auth import credentials
79from google.auth import exceptions
80from google.auth import jwt
81from google.auth import metrics
82from google.oauth2 import _client
84_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
85_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
86_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
89class Credentials(
90 credentials.Signing,
91 credentials.Scoped,
92 credentials.CredentialsWithQuotaProject,
93 credentials.CredentialsWithTokenUri,
94):
95 """Service account credentials
97 Usually, you'll create these credentials with one of the helper
98 constructors. To create credentials using a Google service account
99 private key JSON file::
101 credentials = service_account.Credentials.from_service_account_file(
102 'service-account.json')
104 Or if you already have the service account file loaded::
106 service_account_info = json.load(open('service_account.json'))
107 credentials = service_account.Credentials.from_service_account_info(
108 service_account_info)
110 Both helper methods pass on arguments to the constructor, so you can
111 specify additional scopes and a subject if necessary::
113 credentials = service_account.Credentials.from_service_account_file(
114 'service-account.json',
115 scopes=['email'],
116 subject='user@example.com')
118 The credentials are considered immutable. If you want to modify the scopes
119 or the subject used for delegation, use :meth:`with_scopes` or
120 :meth:`with_subject`::
122 scoped_credentials = credentials.with_scopes(['email'])
123 delegated_credentials = credentials.with_subject(subject)
125 To add a quota project, use :meth:`with_quota_project`::
127 credentials = credentials.with_quota_project('myproject-123')
128 """
130 def __init__(
131 self,
132 signer,
133 service_account_email,
134 token_uri,
135 scopes=None,
136 default_scopes=None,
137 subject=None,
138 project_id=None,
139 quota_project_id=None,
140 additional_claims=None,
141 always_use_jwt_access=False,
142 universe_domain=_DEFAULT_UNIVERSE_DOMAIN,
143 trust_boundary=None,
144 ):
145 """
146 Args:
147 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
148 service_account_email (str): The service account's email.
149 scopes (Sequence[str]): User-defined scopes to request during the
150 authorization grant.
151 default_scopes (Sequence[str]): Default scopes passed by a
152 Google client library. Use 'scopes' for user-defined scopes.
153 token_uri (str): The OAuth 2.0 Token URI.
154 subject (str): For domain-wide delegation, the email address of the
155 user to for which to request delegated access.
156 project_id (str): Project ID associated with the service account
157 credential.
158 quota_project_id (Optional[str]): The project ID used for quota and
159 billing.
160 additional_claims (Mapping[str, str]): Any additional claims for
161 the JWT assertion used in the authorization grant.
162 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
163 be always used.
164 universe_domain (str): The universe domain. The default
165 universe domain is googleapis.com. For default value self
166 signed jwt is used for token refresh.
167 trust_boundary (str): String representation of trust boundary meta.
169 .. note:: Typically one of the helper constructors
170 :meth:`from_service_account_file` or
171 :meth:`from_service_account_info` are used instead of calling the
172 constructor directly.
173 """
174 super(Credentials, self).__init__()
176 self._scopes = scopes
177 self._default_scopes = default_scopes
178 self._signer = signer
179 self._service_account_email = service_account_email
180 self._subject = subject
181 self._project_id = project_id
182 self._quota_project_id = quota_project_id
183 self._token_uri = token_uri
184 self._always_use_jwt_access = always_use_jwt_access
185 if not universe_domain:
186 self._universe_domain = _DEFAULT_UNIVERSE_DOMAIN
187 else:
188 self._universe_domain = universe_domain
190 if universe_domain != _DEFAULT_UNIVERSE_DOMAIN:
191 self._always_use_jwt_access = True
193 self._jwt_credentials = None
195 if additional_claims is not None:
196 self._additional_claims = additional_claims
197 else:
198 self._additional_claims = {}
199 self._trust_boundary = "0"
201 @classmethod
202 def _from_signer_and_info(cls, signer, info, **kwargs):
203 """Creates a Credentials instance from a signer and service account
204 info.
206 Args:
207 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
208 info (Mapping[str, str]): The service account info.
209 kwargs: Additional arguments to pass to the constructor.
211 Returns:
212 google.auth.jwt.Credentials: The constructed credentials.
214 Raises:
215 ValueError: If the info is not in the expected format.
216 """
217 return cls(
218 signer,
219 service_account_email=info["client_email"],
220 token_uri=info["token_uri"],
221 project_id=info.get("project_id"),
222 universe_domain=info.get("universe_domain", _DEFAULT_UNIVERSE_DOMAIN),
223 trust_boundary=info.get("trust_boundary"),
224 **kwargs
225 )
227 @classmethod
228 def from_service_account_info(cls, info, **kwargs):
229 """Creates a Credentials instance from parsed service account info.
231 Args:
232 info (Mapping[str, str]): The service account info in Google
233 format.
234 kwargs: Additional arguments to pass to the constructor.
236 Returns:
237 google.auth.service_account.Credentials: The constructed
238 credentials.
240 Raises:
241 ValueError: If the info is not in the expected format.
242 """
243 signer = _service_account_info.from_dict(
244 info, require=["client_email", "token_uri"]
245 )
246 return cls._from_signer_and_info(signer, info, **kwargs)
248 @classmethod
249 def from_service_account_file(cls, filename, **kwargs):
250 """Creates a Credentials instance from a service account json file.
252 Args:
253 filename (str): The path to the service account json file.
254 kwargs: Additional arguments to pass to the constructor.
256 Returns:
257 google.auth.service_account.Credentials: The constructed
258 credentials.
259 """
260 info, signer = _service_account_info.from_filename(
261 filename, require=["client_email", "token_uri"]
262 )
263 return cls._from_signer_and_info(signer, info, **kwargs)
265 @property
266 def service_account_email(self):
267 """The service account email."""
268 return self._service_account_email
270 @property
271 def project_id(self):
272 """Project ID associated with this credential."""
273 return self._project_id
275 @property
276 def requires_scopes(self):
277 """Checks if the credentials requires scopes.
279 Returns:
280 bool: True if there are no scopes set otherwise False.
281 """
282 return True if not self._scopes else False
284 def _make_copy(self):
285 cred = self.__class__(
286 self._signer,
287 service_account_email=self._service_account_email,
288 scopes=copy.copy(self._scopes),
289 default_scopes=copy.copy(self._default_scopes),
290 token_uri=self._token_uri,
291 subject=self._subject,
292 project_id=self._project_id,
293 quota_project_id=self._quota_project_id,
294 additional_claims=self._additional_claims.copy(),
295 always_use_jwt_access=self._always_use_jwt_access,
296 universe_domain=self._universe_domain,
297 )
298 return cred
300 @_helpers.copy_docstring(credentials.Scoped)
301 def with_scopes(self, scopes, default_scopes=None):
302 cred = self._make_copy()
303 cred._scopes = scopes
304 cred._default_scopes = default_scopes
305 return cred
307 def with_always_use_jwt_access(self, always_use_jwt_access):
308 """Create a copy of these credentials with the specified always_use_jwt_access value.
310 Args:
311 always_use_jwt_access (bool): Whether always use self signed JWT or not.
313 Returns:
314 google.auth.service_account.Credentials: A new credentials
315 instance.
316 Raises:
317 google.auth.exceptions.InvalidValue: If the universe domain is not
318 default and always_use_jwt_access is False.
319 """
320 cred = self._make_copy()
321 if (
322 cred._universe_domain != _DEFAULT_UNIVERSE_DOMAIN
323 and not always_use_jwt_access
324 ):
325 raise exceptions.InvalidValue(
326 "always_use_jwt_access should be True for non-default universe domain"
327 )
328 cred._always_use_jwt_access = always_use_jwt_access
329 return cred
331 def with_subject(self, subject):
332 """Create a copy of these credentials with the specified subject.
334 Args:
335 subject (str): The subject claim.
337 Returns:
338 google.auth.service_account.Credentials: A new credentials
339 instance.
340 """
341 cred = self._make_copy()
342 cred._subject = subject
343 return cred
345 def with_claims(self, additional_claims):
346 """Returns a copy of these credentials with modified claims.
348 Args:
349 additional_claims (Mapping[str, str]): Any additional claims for
350 the JWT payload. This will be merged with the current
351 additional claims.
353 Returns:
354 google.auth.service_account.Credentials: A new credentials
355 instance.
356 """
357 new_additional_claims = copy.deepcopy(self._additional_claims)
358 new_additional_claims.update(additional_claims or {})
359 cred = self._make_copy()
360 cred._additional_claims = new_additional_claims
361 return cred
363 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
364 def with_quota_project(self, quota_project_id):
365 cred = self._make_copy()
366 cred._quota_project_id = quota_project_id
367 return cred
369 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
370 def with_token_uri(self, token_uri):
371 cred = self._make_copy()
372 cred._token_uri = token_uri
373 return cred
375 def _make_authorization_grant_assertion(self):
376 """Create the OAuth 2.0 assertion.
378 This assertion is used during the OAuth 2.0 grant to acquire an
379 access token.
381 Returns:
382 bytes: The authorization grant assertion.
383 """
384 now = _helpers.utcnow()
385 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
386 expiry = now + lifetime
388 payload = {
389 "iat": _helpers.datetime_to_secs(now),
390 "exp": _helpers.datetime_to_secs(expiry),
391 # The issuer must be the service account email.
392 "iss": self._service_account_email,
393 # The audience must be the auth token endpoint's URI
394 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
395 "scope": _helpers.scopes_to_string(self._scopes or ()),
396 }
398 payload.update(self._additional_claims)
400 # The subject can be a user email for domain-wide delegation.
401 if self._subject:
402 payload.setdefault("sub", self._subject)
404 token = jwt.encode(self._signer, payload)
406 return token
408 def _use_self_signed_jwt(self):
409 # Since domain wide delegation doesn't work with self signed JWT. If
410 # subject exists, then we should not use self signed JWT.
411 return self._subject is None and self._jwt_credentials is not None
413 def _metric_header_for_usage(self):
414 if self._use_self_signed_jwt():
415 return metrics.CRED_TYPE_SA_JWT
416 return metrics.CRED_TYPE_SA_ASSERTION
418 @_helpers.copy_docstring(credentials.Credentials)
419 def refresh(self, request):
420 if (
421 self._universe_domain != _DEFAULT_UNIVERSE_DOMAIN
422 and not self._jwt_credentials
423 ):
424 raise exceptions.RefreshError(
425 "self._jwt_credentials is missing for non-default universe domain"
426 )
427 if self._universe_domain != _DEFAULT_UNIVERSE_DOMAIN and self._subject:
428 raise exceptions.RefreshError(
429 "domain wide delegation is not supported for non-default universe domain"
430 )
432 if self._use_self_signed_jwt():
433 self._jwt_credentials.refresh(request)
434 self.token = self._jwt_credentials.token.decode()
435 self.expiry = self._jwt_credentials.expiry
436 else:
437 assertion = self._make_authorization_grant_assertion()
438 access_token, expiry, _ = _client.jwt_grant(
439 request, self._token_uri, assertion
440 )
441 self.token = access_token
442 self.expiry = expiry
444 def _create_self_signed_jwt(self, audience):
445 """Create a self-signed JWT from the credentials if requirements are met.
447 Args:
448 audience (str): The service URL. ``https://[API_ENDPOINT]/``
449 """
450 # https://google.aip.dev/auth/4111
451 if self._always_use_jwt_access:
452 if self._scopes:
453 additional_claims = {"scope": " ".join(self._scopes)}
454 if (
455 self._jwt_credentials is None
456 or self._jwt_credentials.additional_claims != additional_claims
457 ):
458 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
459 self, None, additional_claims=additional_claims
460 )
461 elif audience:
462 if (
463 self._jwt_credentials is None
464 or self._jwt_credentials._audience != audience
465 ):
467 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
468 self, audience
469 )
470 elif self._default_scopes:
471 additional_claims = {"scope": " ".join(self._default_scopes)}
472 if (
473 self._jwt_credentials is None
474 or additional_claims != self._jwt_credentials.additional_claims
475 ):
476 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
477 self, None, additional_claims=additional_claims
478 )
479 elif not self._scopes and audience:
480 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
481 self, audience
482 )
484 @_helpers.copy_docstring(credentials.Signing)
485 def sign_bytes(self, message):
486 return self._signer.sign(message)
488 @property # type: ignore
489 @_helpers.copy_docstring(credentials.Signing)
490 def signer(self):
491 return self._signer
493 @property # type: ignore
494 @_helpers.copy_docstring(credentials.Signing)
495 def signer_email(self):
496 return self._service_account_email
499class IDTokenCredentials(
500 credentials.Signing,
501 credentials.CredentialsWithQuotaProject,
502 credentials.CredentialsWithTokenUri,
503):
504 """Open ID Connect ID Token-based service account credentials.
506 These credentials are largely similar to :class:`.Credentials`, but instead
507 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
508 ID Connect ID Token as the bearer token. These credentials are useful when
509 communicating to services that require ID Tokens and can not accept access
510 tokens.
512 Usually, you'll create these credentials with one of the helper
513 constructors. To create credentials using a Google service account
514 private key JSON file::
516 credentials = (
517 service_account.IDTokenCredentials.from_service_account_file(
518 'service-account.json'))
521 Or if you already have the service account file loaded::
523 service_account_info = json.load(open('service_account.json'))
524 credentials = (
525 service_account.IDTokenCredentials.from_service_account_info(
526 service_account_info))
529 Both helper methods pass on arguments to the constructor, so you can
530 specify additional scopes and a subject if necessary::
532 credentials = (
533 service_account.IDTokenCredentials.from_service_account_file(
534 'service-account.json',
535 scopes=['email'],
536 subject='user@example.com'))
539 The credentials are considered immutable. If you want to modify the scopes
540 or the subject used for delegation, use :meth:`with_scopes` or
541 :meth:`with_subject`::
543 scoped_credentials = credentials.with_scopes(['email'])
544 delegated_credentials = credentials.with_subject(subject)
546 """
548 def __init__(
549 self,
550 signer,
551 service_account_email,
552 token_uri,
553 target_audience,
554 additional_claims=None,
555 quota_project_id=None,
556 universe_domain=_DEFAULT_UNIVERSE_DOMAIN,
557 ):
558 """
559 Args:
560 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
561 service_account_email (str): The service account's email.
562 token_uri (str): The OAuth 2.0 Token URI.
563 target_audience (str): The intended audience for these credentials,
564 used when requesting the ID Token. The ID Token's ``aud`` claim
565 will be set to this string.
566 additional_claims (Mapping[str, str]): Any additional claims for
567 the JWT assertion used in the authorization grant.
568 quota_project_id (Optional[str]): The project ID used for quota and billing.
569 universe_domain (str): The universe domain. The default
570 universe domain is googleapis.com. For default value IAM ID
571 token endponint is used for token refresh. Note that
572 iam.serviceAccountTokenCreator role is required to use the IAM
573 endpoint.
574 .. note:: Typically one of the helper constructors
575 :meth:`from_service_account_file` or
576 :meth:`from_service_account_info` are used instead of calling the
577 constructor directly.
578 """
579 super(IDTokenCredentials, self).__init__()
580 self._signer = signer
581 self._service_account_email = service_account_email
582 self._token_uri = token_uri
583 self._target_audience = target_audience
584 self._quota_project_id = quota_project_id
585 self._use_iam_endpoint = False
587 if not universe_domain:
588 self._universe_domain = _DEFAULT_UNIVERSE_DOMAIN
589 else:
590 self._universe_domain = universe_domain
592 if universe_domain != _DEFAULT_UNIVERSE_DOMAIN:
593 self._use_iam_endpoint = True
595 if additional_claims is not None:
596 self._additional_claims = additional_claims
597 else:
598 self._additional_claims = {}
600 @classmethod
601 def _from_signer_and_info(cls, signer, info, **kwargs):
602 """Creates a credentials instance from a signer and service account
603 info.
605 Args:
606 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
607 info (Mapping[str, str]): The service account info.
608 kwargs: Additional arguments to pass to the constructor.
610 Returns:
611 google.auth.jwt.IDTokenCredentials: The constructed credentials.
613 Raises:
614 ValueError: If the info is not in the expected format.
615 """
616 kwargs.setdefault("service_account_email", info["client_email"])
617 kwargs.setdefault("token_uri", info["token_uri"])
618 if "universe_domain" in info:
619 kwargs["universe_domain"] = info["universe_domain"]
620 return cls(signer, **kwargs)
622 @classmethod
623 def from_service_account_info(cls, info, **kwargs):
624 """Creates a credentials instance from parsed service account info.
626 Args:
627 info (Mapping[str, str]): The service account info in Google
628 format.
629 kwargs: Additional arguments to pass to the constructor.
631 Returns:
632 google.auth.service_account.IDTokenCredentials: The constructed
633 credentials.
635 Raises:
636 ValueError: If the info is not in the expected format.
637 """
638 signer = _service_account_info.from_dict(
639 info, require=["client_email", "token_uri"]
640 )
641 return cls._from_signer_and_info(signer, info, **kwargs)
643 @classmethod
644 def from_service_account_file(cls, filename, **kwargs):
645 """Creates a credentials instance from a service account json file.
647 Args:
648 filename (str): The path to the service account json file.
649 kwargs: Additional arguments to pass to the constructor.
651 Returns:
652 google.auth.service_account.IDTokenCredentials: The constructed
653 credentials.
654 """
655 info, signer = _service_account_info.from_filename(
656 filename, require=["client_email", "token_uri"]
657 )
658 return cls._from_signer_and_info(signer, info, **kwargs)
660 def _make_copy(self):
661 cred = self.__class__(
662 self._signer,
663 service_account_email=self._service_account_email,
664 token_uri=self._token_uri,
665 target_audience=self._target_audience,
666 additional_claims=self._additional_claims.copy(),
667 quota_project_id=self.quota_project_id,
668 universe_domain=self._universe_domain,
669 )
670 # _use_iam_endpoint is not exposed in the constructor
671 cred._use_iam_endpoint = self._use_iam_endpoint
672 return cred
674 def with_target_audience(self, target_audience):
675 """Create a copy of these credentials with the specified target
676 audience.
678 Args:
679 target_audience (str): The intended audience for these credentials,
680 used when requesting the ID Token.
682 Returns:
683 google.auth.service_account.IDTokenCredentials: A new credentials
684 instance.
685 """
686 cred = self._make_copy()
687 cred._target_audience = target_audience
688 return cred
690 def _with_use_iam_endpoint(self, use_iam_endpoint):
691 """Create a copy of these credentials with the use_iam_endpoint value.
693 Args:
694 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
695 be used instead of the token_uri. Note that
696 iam.serviceAccountTokenCreator role is required to use the IAM
697 endpoint. The default value is False. This feature is currently
698 experimental and subject to change without notice.
700 Returns:
701 google.auth.service_account.IDTokenCredentials: A new credentials
702 instance.
703 Raises:
704 google.auth.exceptions.InvalidValue: If the universe domain is not
705 default and use_iam_endpoint is False.
706 """
707 cred = self._make_copy()
708 if cred._universe_domain != _DEFAULT_UNIVERSE_DOMAIN and not use_iam_endpoint:
709 raise exceptions.InvalidValue(
710 "use_iam_endpoint should be True for non-default universe domain"
711 )
712 cred._use_iam_endpoint = use_iam_endpoint
713 return cred
715 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
716 def with_quota_project(self, quota_project_id):
717 cred = self._make_copy()
718 cred._quota_project_id = quota_project_id
719 return cred
721 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
722 def with_token_uri(self, token_uri):
723 cred = self._make_copy()
724 cred._token_uri = token_uri
725 return cred
727 def _make_authorization_grant_assertion(self):
728 """Create the OAuth 2.0 assertion.
730 This assertion is used during the OAuth 2.0 grant to acquire an
731 ID token.
733 Returns:
734 bytes: The authorization grant assertion.
735 """
736 now = _helpers.utcnow()
737 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
738 expiry = now + lifetime
740 payload = {
741 "iat": _helpers.datetime_to_secs(now),
742 "exp": _helpers.datetime_to_secs(expiry),
743 # The issuer must be the service account email.
744 "iss": self.service_account_email,
745 # The audience must be the auth token endpoint's URI
746 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
747 # The target audience specifies which service the ID token is
748 # intended for.
749 "target_audience": self._target_audience,
750 }
752 payload.update(self._additional_claims)
754 token = jwt.encode(self._signer, payload)
756 return token
758 def _refresh_with_iam_endpoint(self, request):
759 """Use IAM generateIdToken endpoint to obtain an ID token.
761 It works as follows:
763 1. First we create a self signed jwt with
764 https://www.googleapis.com/auth/iam being the scope.
766 2. Next we use the self signed jwt as the access token, and make a POST
767 request to IAM generateIdToken endpoint. The request body is:
768 {
769 "audience": self._target_audience,
770 "includeEmail": "true",
771 "useEmailAzp": "true",
772 }
774 If the request is succesfully, it will return {"token":"the ID token"},
775 and we can extract the ID token and compute its expiry.
776 """
777 jwt_credentials = jwt.Credentials.from_signing_credentials(
778 self,
779 None,
780 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
781 )
782 jwt_credentials.refresh(request)
783 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
784 request,
785 self.signer_email,
786 self._target_audience,
787 jwt_credentials.token.decode(),
788 )
790 @_helpers.copy_docstring(credentials.Credentials)
791 def refresh(self, request):
792 if self._use_iam_endpoint:
793 self._refresh_with_iam_endpoint(request)
794 else:
795 assertion = self._make_authorization_grant_assertion()
796 access_token, expiry, _ = _client.id_token_jwt_grant(
797 request, self._token_uri, assertion
798 )
799 self.token = access_token
800 self.expiry = expiry
802 @property
803 def service_account_email(self):
804 """The service account email."""
805 return self._service_account_email
807 @_helpers.copy_docstring(credentials.Signing)
808 def sign_bytes(self, message):
809 return self._signer.sign(message)
811 @property # type: ignore
812 @_helpers.copy_docstring(credentials.Signing)
813 def signer(self):
814 return self._signer
816 @property # type: ignore
817 @_helpers.copy_docstring(credentials.Signing)
818 def signer_email(self):
819 return self._service_account_email