Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/oauth2/service_account.py: 34%
235 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
54Domain-wide delegation
55----------------------
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
73import copy
74import datetime
76from google.auth import _helpers
77from google.auth import _service_account_info
78from google.auth import credentials
79from google.auth import exceptions
80from google.auth import jwt
81from google.auth import metrics
82from google.oauth2 import _client
84_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
85_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
86_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
89class Credentials(
90 credentials.Signing,
91 credentials.Scoped,
92 credentials.CredentialsWithQuotaProject,
93 credentials.CredentialsWithTokenUri,
94):
95 """Service account credentials
97 Usually, you'll create these credentials with one of the helper
98 constructors. To create credentials using a Google service account
99 private key JSON file::
101 credentials = service_account.Credentials.from_service_account_file(
102 'service-account.json')
104 Or if you already have the service account file loaded::
106 service_account_info = json.load(open('service_account.json'))
107 credentials = service_account.Credentials.from_service_account_info(
108 service_account_info)
110 Both helper methods pass on arguments to the constructor, so you can
111 specify additional scopes and a subject if necessary::
113 credentials = service_account.Credentials.from_service_account_file(
114 'service-account.json',
115 scopes=['email'],
116 subject='user@example.com')
118 The credentials are considered immutable. If you want to modify the scopes
119 or the subject used for delegation, use :meth:`with_scopes` or
120 :meth:`with_subject`::
122 scoped_credentials = credentials.with_scopes(['email'])
123 delegated_credentials = credentials.with_subject(subject)
125 To add a quota project, use :meth:`with_quota_project`::
127 credentials = credentials.with_quota_project('myproject-123')
128 """
130 def __init__(
131 self,
132 signer,
133 service_account_email,
134 token_uri,
135 scopes=None,
136 default_scopes=None,
137 subject=None,
138 project_id=None,
139 quota_project_id=None,
140 additional_claims=None,
141 always_use_jwt_access=False,
142 universe_domain=_DEFAULT_UNIVERSE_DOMAIN,
143 ):
144 """
145 Args:
146 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
147 service_account_email (str): The service account's email.
148 scopes (Sequence[str]): User-defined scopes to request during the
149 authorization grant.
150 default_scopes (Sequence[str]): Default scopes passed by a
151 Google client library. Use 'scopes' for user-defined scopes.
152 token_uri (str): The OAuth 2.0 Token URI.
153 subject (str): For domain-wide delegation, the email address of the
154 user to for which to request delegated access.
155 project_id (str): Project ID associated with the service account
156 credential.
157 quota_project_id (Optional[str]): The project ID used for quota and
158 billing.
159 additional_claims (Mapping[str, str]): Any additional claims for
160 the JWT assertion used in the authorization grant.
161 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
162 be always used.
163 universe_domain (str): The universe domain. The default
164 universe domain is googleapis.com. For default value self
165 signed jwt is used for token refresh.
167 .. note:: Typically one of the helper constructors
168 :meth:`from_service_account_file` or
169 :meth:`from_service_account_info` are used instead of calling the
170 constructor directly.
171 """
172 super(Credentials, self).__init__()
174 self._scopes = scopes
175 self._default_scopes = default_scopes
176 self._signer = signer
177 self._service_account_email = service_account_email
178 self._subject = subject
179 self._project_id = project_id
180 self._quota_project_id = quota_project_id
181 self._token_uri = token_uri
182 self._always_use_jwt_access = always_use_jwt_access
183 if not universe_domain:
184 self._universe_domain = _DEFAULT_UNIVERSE_DOMAIN
185 else:
186 self._universe_domain = universe_domain
188 if universe_domain != _DEFAULT_UNIVERSE_DOMAIN:
189 self._always_use_jwt_access = True
191 self._jwt_credentials = None
193 if additional_claims is not None:
194 self._additional_claims = additional_claims
195 else:
196 self._additional_claims = {}
198 @classmethod
199 def _from_signer_and_info(cls, signer, info, **kwargs):
200 """Creates a Credentials instance from a signer and service account
201 info.
203 Args:
204 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
205 info (Mapping[str, str]): The service account info.
206 kwargs: Additional arguments to pass to the constructor.
208 Returns:
209 google.auth.jwt.Credentials: The constructed credentials.
211 Raises:
212 ValueError: If the info is not in the expected format.
213 """
214 return cls(
215 signer,
216 service_account_email=info["client_email"],
217 token_uri=info["token_uri"],
218 project_id=info.get("project_id"),
219 universe_domain=info.get("universe_domain", _DEFAULT_UNIVERSE_DOMAIN),
220 **kwargs
221 )
223 @classmethod
224 def from_service_account_info(cls, info, **kwargs):
225 """Creates a Credentials instance from parsed service account info.
227 Args:
228 info (Mapping[str, str]): The service account info in Google
229 format.
230 kwargs: Additional arguments to pass to the constructor.
232 Returns:
233 google.auth.service_account.Credentials: The constructed
234 credentials.
236 Raises:
237 ValueError: If the info is not in the expected format.
238 """
239 signer = _service_account_info.from_dict(
240 info, require=["client_email", "token_uri"]
241 )
242 return cls._from_signer_and_info(signer, info, **kwargs)
244 @classmethod
245 def from_service_account_file(cls, filename, **kwargs):
246 """Creates a Credentials instance from a service account json file.
248 Args:
249 filename (str): The path to the service account json file.
250 kwargs: Additional arguments to pass to the constructor.
252 Returns:
253 google.auth.service_account.Credentials: The constructed
254 credentials.
255 """
256 info, signer = _service_account_info.from_filename(
257 filename, require=["client_email", "token_uri"]
258 )
259 return cls._from_signer_and_info(signer, info, **kwargs)
261 @property
262 def service_account_email(self):
263 """The service account email."""
264 return self._service_account_email
266 @property
267 def project_id(self):
268 """Project ID associated with this credential."""
269 return self._project_id
271 @property
272 def requires_scopes(self):
273 """Checks if the credentials requires scopes.
275 Returns:
276 bool: True if there are no scopes set otherwise False.
277 """
278 return True if not self._scopes else False
280 def _make_copy(self):
281 cred = self.__class__(
282 self._signer,
283 service_account_email=self._service_account_email,
284 scopes=copy.copy(self._scopes),
285 default_scopes=copy.copy(self._default_scopes),
286 token_uri=self._token_uri,
287 subject=self._subject,
288 project_id=self._project_id,
289 quota_project_id=self._quota_project_id,
290 additional_claims=self._additional_claims.copy(),
291 always_use_jwt_access=self._always_use_jwt_access,
292 universe_domain=self._universe_domain,
293 )
294 return cred
296 @_helpers.copy_docstring(credentials.Scoped)
297 def with_scopes(self, scopes, default_scopes=None):
298 cred = self._make_copy()
299 cred._scopes = scopes
300 cred._default_scopes = default_scopes
301 return cred
303 def with_always_use_jwt_access(self, always_use_jwt_access):
304 """Create a copy of these credentials with the specified always_use_jwt_access value.
306 Args:
307 always_use_jwt_access (bool): Whether always use self signed JWT or not.
309 Returns:
310 google.auth.service_account.Credentials: A new credentials
311 instance.
312 Raises:
313 google.auth.exceptions.InvalidValue: If the universe domain is not
314 default and always_use_jwt_access is False.
315 """
316 cred = self._make_copy()
317 if (
318 cred._universe_domain != _DEFAULT_UNIVERSE_DOMAIN
319 and not always_use_jwt_access
320 ):
321 raise exceptions.InvalidValue(
322 "always_use_jwt_access should be True for non-default universe domain"
323 )
324 cred._always_use_jwt_access = always_use_jwt_access
325 return cred
327 def with_subject(self, subject):
328 """Create a copy of these credentials with the specified subject.
330 Args:
331 subject (str): The subject claim.
333 Returns:
334 google.auth.service_account.Credentials: A new credentials
335 instance.
336 """
337 cred = self._make_copy()
338 cred._subject = subject
339 return cred
341 def with_claims(self, additional_claims):
342 """Returns a copy of these credentials with modified claims.
344 Args:
345 additional_claims (Mapping[str, str]): Any additional claims for
346 the JWT payload. This will be merged with the current
347 additional claims.
349 Returns:
350 google.auth.service_account.Credentials: A new credentials
351 instance.
352 """
353 new_additional_claims = copy.deepcopy(self._additional_claims)
354 new_additional_claims.update(additional_claims or {})
355 cred = self._make_copy()
356 cred._additional_claims = new_additional_claims
357 return cred
359 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
360 def with_quota_project(self, quota_project_id):
361 cred = self._make_copy()
362 cred._quota_project_id = quota_project_id
363 return cred
365 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
366 def with_token_uri(self, token_uri):
367 cred = self._make_copy()
368 cred._token_uri = token_uri
369 return cred
371 def _make_authorization_grant_assertion(self):
372 """Create the OAuth 2.0 assertion.
374 This assertion is used during the OAuth 2.0 grant to acquire an
375 access token.
377 Returns:
378 bytes: The authorization grant assertion.
379 """
380 now = _helpers.utcnow()
381 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
382 expiry = now + lifetime
384 payload = {
385 "iat": _helpers.datetime_to_secs(now),
386 "exp": _helpers.datetime_to_secs(expiry),
387 # The issuer must be the service account email.
388 "iss": self._service_account_email,
389 # The audience must be the auth token endpoint's URI
390 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
391 "scope": _helpers.scopes_to_string(self._scopes or ()),
392 }
394 payload.update(self._additional_claims)
396 # The subject can be a user email for domain-wide delegation.
397 if self._subject:
398 payload.setdefault("sub", self._subject)
400 token = jwt.encode(self._signer, payload)
402 return token
404 def _use_self_signed_jwt(self):
405 # Since domain wide delegation doesn't work with self signed JWT. If
406 # subject exists, then we should not use self signed JWT.
407 return self._subject is None and self._jwt_credentials is not None
409 def _metric_header_for_usage(self):
410 if self._use_self_signed_jwt():
411 return metrics.CRED_TYPE_SA_JWT
412 return metrics.CRED_TYPE_SA_ASSERTION
414 @_helpers.copy_docstring(credentials.Credentials)
415 def refresh(self, request):
416 if (
417 self._universe_domain != _DEFAULT_UNIVERSE_DOMAIN
418 and not self._jwt_credentials
419 ):
420 raise exceptions.RefreshError(
421 "self._jwt_credentials is missing for non-default universe domain"
422 )
423 if self._universe_domain != _DEFAULT_UNIVERSE_DOMAIN and self._subject:
424 raise exceptions.RefreshError(
425 "domain wide delegation is not supported for non-default universe domain"
426 )
428 if self._use_self_signed_jwt():
429 self._jwt_credentials.refresh(request)
430 self.token = self._jwt_credentials.token.decode()
431 self.expiry = self._jwt_credentials.expiry
432 else:
433 assertion = self._make_authorization_grant_assertion()
434 access_token, expiry, _ = _client.jwt_grant(
435 request, self._token_uri, assertion
436 )
437 self.token = access_token
438 self.expiry = expiry
440 def _create_self_signed_jwt(self, audience):
441 """Create a self-signed JWT from the credentials if requirements are met.
443 Args:
444 audience (str): The service URL. ``https://[API_ENDPOINT]/``
445 """
446 # https://google.aip.dev/auth/4111
447 if self._always_use_jwt_access:
448 if self._scopes:
449 additional_claims = {"scope": " ".join(self._scopes)}
450 if (
451 self._jwt_credentials is None
452 or self._jwt_credentials.additional_claims != additional_claims
453 ):
454 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
455 self, None, additional_claims=additional_claims
456 )
457 elif audience:
458 if (
459 self._jwt_credentials is None
460 or self._jwt_credentials._audience != audience
461 ):
463 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
464 self, audience
465 )
466 elif self._default_scopes:
467 additional_claims = {"scope": " ".join(self._default_scopes)}
468 if (
469 self._jwt_credentials is None
470 or additional_claims != self._jwt_credentials.additional_claims
471 ):
472 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
473 self, None, additional_claims=additional_claims
474 )
475 elif not self._scopes and audience:
476 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
477 self, audience
478 )
480 @_helpers.copy_docstring(credentials.Signing)
481 def sign_bytes(self, message):
482 return self._signer.sign(message)
484 @property # type: ignore
485 @_helpers.copy_docstring(credentials.Signing)
486 def signer(self):
487 return self._signer
489 @property # type: ignore
490 @_helpers.copy_docstring(credentials.Signing)
491 def signer_email(self):
492 return self._service_account_email
495class IDTokenCredentials(
496 credentials.Signing,
497 credentials.CredentialsWithQuotaProject,
498 credentials.CredentialsWithTokenUri,
499):
500 """Open ID Connect ID Token-based service account credentials.
502 These credentials are largely similar to :class:`.Credentials`, but instead
503 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
504 ID Connect ID Token as the bearer token. These credentials are useful when
505 communicating to services that require ID Tokens and can not accept access
506 tokens.
508 Usually, you'll create these credentials with one of the helper
509 constructors. To create credentials using a Google service account
510 private key JSON file::
512 credentials = (
513 service_account.IDTokenCredentials.from_service_account_file(
514 'service-account.json'))
517 Or if you already have the service account file loaded::
519 service_account_info = json.load(open('service_account.json'))
520 credentials = (
521 service_account.IDTokenCredentials.from_service_account_info(
522 service_account_info))
525 Both helper methods pass on arguments to the constructor, so you can
526 specify additional scopes and a subject if necessary::
528 credentials = (
529 service_account.IDTokenCredentials.from_service_account_file(
530 'service-account.json',
531 scopes=['email'],
532 subject='user@example.com'))
535 The credentials are considered immutable. If you want to modify the scopes
536 or the subject used for delegation, use :meth:`with_scopes` or
537 :meth:`with_subject`::
539 scoped_credentials = credentials.with_scopes(['email'])
540 delegated_credentials = credentials.with_subject(subject)
542 """
544 def __init__(
545 self,
546 signer,
547 service_account_email,
548 token_uri,
549 target_audience,
550 additional_claims=None,
551 quota_project_id=None,
552 universe_domain=_DEFAULT_UNIVERSE_DOMAIN,
553 ):
554 """
555 Args:
556 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
557 service_account_email (str): The service account's email.
558 token_uri (str): The OAuth 2.0 Token URI.
559 target_audience (str): The intended audience for these credentials,
560 used when requesting the ID Token. The ID Token's ``aud`` claim
561 will be set to this string.
562 additional_claims (Mapping[str, str]): Any additional claims for
563 the JWT assertion used in the authorization grant.
564 quota_project_id (Optional[str]): The project ID used for quota and billing.
565 universe_domain (str): The universe domain. The default
566 universe domain is googleapis.com. For default value IAM ID
567 token endponint is used for token refresh. Note that
568 iam.serviceAccountTokenCreator role is required to use the IAM
569 endpoint.
570 .. note:: Typically one of the helper constructors
571 :meth:`from_service_account_file` or
572 :meth:`from_service_account_info` are used instead of calling the
573 constructor directly.
574 """
575 super(IDTokenCredentials, self).__init__()
576 self._signer = signer
577 self._service_account_email = service_account_email
578 self._token_uri = token_uri
579 self._target_audience = target_audience
580 self._quota_project_id = quota_project_id
581 self._use_iam_endpoint = False
583 if not universe_domain:
584 self._universe_domain = _DEFAULT_UNIVERSE_DOMAIN
585 else:
586 self._universe_domain = universe_domain
588 if universe_domain != _DEFAULT_UNIVERSE_DOMAIN:
589 self._use_iam_endpoint = True
591 if additional_claims is not None:
592 self._additional_claims = additional_claims
593 else:
594 self._additional_claims = {}
596 @classmethod
597 def _from_signer_and_info(cls, signer, info, **kwargs):
598 """Creates a credentials instance from a signer and service account
599 info.
601 Args:
602 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
603 info (Mapping[str, str]): The service account info.
604 kwargs: Additional arguments to pass to the constructor.
606 Returns:
607 google.auth.jwt.IDTokenCredentials: The constructed credentials.
609 Raises:
610 ValueError: If the info is not in the expected format.
611 """
612 kwargs.setdefault("service_account_email", info["client_email"])
613 kwargs.setdefault("token_uri", info["token_uri"])
614 if "universe_domain" in info:
615 kwargs["universe_domain"] = info["universe_domain"]
616 return cls(signer, **kwargs)
618 @classmethod
619 def from_service_account_info(cls, info, **kwargs):
620 """Creates a credentials instance from parsed service account info.
622 Args:
623 info (Mapping[str, str]): The service account info in Google
624 format.
625 kwargs: Additional arguments to pass to the constructor.
627 Returns:
628 google.auth.service_account.IDTokenCredentials: The constructed
629 credentials.
631 Raises:
632 ValueError: If the info is not in the expected format.
633 """
634 signer = _service_account_info.from_dict(
635 info, require=["client_email", "token_uri"]
636 )
637 return cls._from_signer_and_info(signer, info, **kwargs)
639 @classmethod
640 def from_service_account_file(cls, filename, **kwargs):
641 """Creates a credentials instance from a service account json file.
643 Args:
644 filename (str): The path to the service account json file.
645 kwargs: Additional arguments to pass to the constructor.
647 Returns:
648 google.auth.service_account.IDTokenCredentials: The constructed
649 credentials.
650 """
651 info, signer = _service_account_info.from_filename(
652 filename, require=["client_email", "token_uri"]
653 )
654 return cls._from_signer_and_info(signer, info, **kwargs)
656 def _make_copy(self):
657 cred = self.__class__(
658 self._signer,
659 service_account_email=self._service_account_email,
660 token_uri=self._token_uri,
661 target_audience=self._target_audience,
662 additional_claims=self._additional_claims.copy(),
663 quota_project_id=self.quota_project_id,
664 universe_domain=self._universe_domain,
665 )
666 # _use_iam_endpoint is not exposed in the constructor
667 cred._use_iam_endpoint = self._use_iam_endpoint
668 return cred
670 def with_target_audience(self, target_audience):
671 """Create a copy of these credentials with the specified target
672 audience.
674 Args:
675 target_audience (str): The intended audience for these credentials,
676 used when requesting the ID Token.
678 Returns:
679 google.auth.service_account.IDTokenCredentials: A new credentials
680 instance.
681 """
682 cred = self._make_copy()
683 cred._target_audience = target_audience
684 return cred
686 def _with_use_iam_endpoint(self, use_iam_endpoint):
687 """Create a copy of these credentials with the use_iam_endpoint value.
689 Args:
690 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
691 be used instead of the token_uri. Note that
692 iam.serviceAccountTokenCreator role is required to use the IAM
693 endpoint. The default value is False. This feature is currently
694 experimental and subject to change without notice.
696 Returns:
697 google.auth.service_account.IDTokenCredentials: A new credentials
698 instance.
699 Raises:
700 google.auth.exceptions.InvalidValue: If the universe domain is not
701 default and use_iam_endpoint is False.
702 """
703 cred = self._make_copy()
704 if cred._universe_domain != _DEFAULT_UNIVERSE_DOMAIN and not use_iam_endpoint:
705 raise exceptions.InvalidValue(
706 "use_iam_endpoint should be True for non-default universe domain"
707 )
708 cred._use_iam_endpoint = use_iam_endpoint
709 return cred
711 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
712 def with_quota_project(self, quota_project_id):
713 cred = self._make_copy()
714 cred._quota_project_id = quota_project_id
715 return cred
717 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
718 def with_token_uri(self, token_uri):
719 cred = self._make_copy()
720 cred._token_uri = token_uri
721 return cred
723 def _make_authorization_grant_assertion(self):
724 """Create the OAuth 2.0 assertion.
726 This assertion is used during the OAuth 2.0 grant to acquire an
727 ID token.
729 Returns:
730 bytes: The authorization grant assertion.
731 """
732 now = _helpers.utcnow()
733 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
734 expiry = now + lifetime
736 payload = {
737 "iat": _helpers.datetime_to_secs(now),
738 "exp": _helpers.datetime_to_secs(expiry),
739 # The issuer must be the service account email.
740 "iss": self.service_account_email,
741 # The audience must be the auth token endpoint's URI
742 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
743 # The target audience specifies which service the ID token is
744 # intended for.
745 "target_audience": self._target_audience,
746 }
748 payload.update(self._additional_claims)
750 token = jwt.encode(self._signer, payload)
752 return token
754 def _refresh_with_iam_endpoint(self, request):
755 """Use IAM generateIdToken endpoint to obtain an ID token.
757 It works as follows:
759 1. First we create a self signed jwt with
760 https://www.googleapis.com/auth/iam being the scope.
762 2. Next we use the self signed jwt as the access token, and make a POST
763 request to IAM generateIdToken endpoint. The request body is:
764 {
765 "audience": self._target_audience,
766 "includeEmail": "true",
767 "useEmailAzp": "true",
768 }
770 If the request is succesfully, it will return {"token":"the ID token"},
771 and we can extract the ID token and compute its expiry.
772 """
773 jwt_credentials = jwt.Credentials.from_signing_credentials(
774 self,
775 None,
776 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
777 )
778 jwt_credentials.refresh(request)
779 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
780 request,
781 self.signer_email,
782 self._target_audience,
783 jwt_credentials.token.decode(),
784 )
786 @_helpers.copy_docstring(credentials.Credentials)
787 def refresh(self, request):
788 if self._use_iam_endpoint:
789 self._refresh_with_iam_endpoint(request)
790 else:
791 assertion = self._make_authorization_grant_assertion()
792 access_token, expiry, _ = _client.id_token_jwt_grant(
793 request, self._token_uri, assertion
794 )
795 self.token = access_token
796 self.expiry = expiry
798 @property
799 def service_account_email(self):
800 """The service account email."""
801 return self._service_account_email
803 @_helpers.copy_docstring(credentials.Signing)
804 def sign_bytes(self, message):
805 return self._signer.sign(message)
807 @property # type: ignore
808 @_helpers.copy_docstring(credentials.Signing)
809 def signer(self):
810 return self._signer
812 @property # type: ignore
813 @_helpers.copy_docstring(credentials.Signing)
814 def signer_email(self):
815 return self._service_account_email