1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75
76from google.auth import _helpers
77from google.auth import _service_account_info
78from google.auth import credentials
79from google.auth import exceptions
80from google.auth import iam
81from google.auth import jwt
82from google.auth import metrics
83from google.oauth2 import _client
84
85_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
86_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
87
88
89class Credentials(
90 credentials.Signing,
91 credentials.Scoped,
92 credentials.CredentialsWithQuotaProject,
93 credentials.CredentialsWithTokenUri,
94):
95 """Service account credentials
96
97 Usually, you'll create these credentials with one of the helper
98 constructors. To create credentials using a Google service account
99 private key JSON file::
100
101 credentials = service_account.Credentials.from_service_account_file(
102 'service-account.json')
103
104 Or if you already have the service account file loaded::
105
106 service_account_info = json.load(open('service_account.json'))
107 credentials = service_account.Credentials.from_service_account_info(
108 service_account_info)
109
110 Both helper methods pass on arguments to the constructor, so you can
111 specify additional scopes and a subject if necessary::
112
113 credentials = service_account.Credentials.from_service_account_file(
114 'service-account.json',
115 scopes=['email'],
116 subject='user@example.com')
117
118 The credentials are considered immutable. If you want to modify the scopes
119 or the subject used for delegation, use :meth:`with_scopes` or
120 :meth:`with_subject`::
121
122 scoped_credentials = credentials.with_scopes(['email'])
123 delegated_credentials = credentials.with_subject(subject)
124
125 To add a quota project, use :meth:`with_quota_project`::
126
127 credentials = credentials.with_quota_project('myproject-123')
128 """
129
130 def __init__(
131 self,
132 signer,
133 service_account_email,
134 token_uri,
135 scopes=None,
136 default_scopes=None,
137 subject=None,
138 project_id=None,
139 quota_project_id=None,
140 additional_claims=None,
141 always_use_jwt_access=False,
142 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
143 trust_boundary=None,
144 ):
145 """
146 Args:
147 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
148 service_account_email (str): The service account's email.
149 scopes (Sequence[str]): User-defined scopes to request during the
150 authorization grant.
151 default_scopes (Sequence[str]): Default scopes passed by a
152 Google client library. Use 'scopes' for user-defined scopes.
153 token_uri (str): The OAuth 2.0 Token URI.
154 subject (str): For domain-wide delegation, the email address of the
155 user to for which to request delegated access.
156 project_id (str): Project ID associated with the service account
157 credential.
158 quota_project_id (Optional[str]): The project ID used for quota and
159 billing.
160 additional_claims (Mapping[str, str]): Any additional claims for
161 the JWT assertion used in the authorization grant.
162 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
163 be always used.
164 universe_domain (str): The universe domain. The default
165 universe domain is googleapis.com. For default value self
166 signed jwt is used for token refresh.
167 trust_boundary (str): String representation of trust boundary meta.
168
169 .. note:: Typically one of the helper constructors
170 :meth:`from_service_account_file` or
171 :meth:`from_service_account_info` are used instead of calling the
172 constructor directly.
173 """
174 super(Credentials, self).__init__()
175
176 self._scopes = scopes
177 self._default_scopes = default_scopes
178 self._signer = signer
179 self._service_account_email = service_account_email
180 self._subject = subject
181 self._project_id = project_id
182 self._quota_project_id = quota_project_id
183 self._token_uri = token_uri
184 self._always_use_jwt_access = always_use_jwt_access
185 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
186
187 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
188 self._always_use_jwt_access = True
189
190 self._jwt_credentials = None
191
192 if additional_claims is not None:
193 self._additional_claims = additional_claims
194 else:
195 self._additional_claims = {}
196 self._trust_boundary = {"locations": [], "encoded_locations": "0x0"}
197
198 @classmethod
199 def _from_signer_and_info(cls, signer, info, **kwargs):
200 """Creates a Credentials instance from a signer and service account
201 info.
202
203 Args:
204 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
205 info (Mapping[str, str]): The service account info.
206 kwargs: Additional arguments to pass to the constructor.
207
208 Returns:
209 google.auth.jwt.Credentials: The constructed credentials.
210
211 Raises:
212 ValueError: If the info is not in the expected format.
213 """
214 return cls(
215 signer,
216 service_account_email=info["client_email"],
217 token_uri=info["token_uri"],
218 project_id=info.get("project_id"),
219 universe_domain=info.get(
220 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
221 ),
222 trust_boundary=info.get("trust_boundary"),
223 **kwargs
224 )
225
226 @classmethod
227 def from_service_account_info(cls, info, **kwargs):
228 """Creates a Credentials instance from parsed service account info.
229
230 Args:
231 info (Mapping[str, str]): The service account info in Google
232 format.
233 kwargs: Additional arguments to pass to the constructor.
234
235 Returns:
236 google.auth.service_account.Credentials: The constructed
237 credentials.
238
239 Raises:
240 ValueError: If the info is not in the expected format.
241 """
242 signer = _service_account_info.from_dict(
243 info, require=["client_email", "token_uri"]
244 )
245 return cls._from_signer_and_info(signer, info, **kwargs)
246
247 @classmethod
248 def from_service_account_file(cls, filename, **kwargs):
249 """Creates a Credentials instance from a service account json file.
250
251 Args:
252 filename (str): The path to the service account json file.
253 kwargs: Additional arguments to pass to the constructor.
254
255 Returns:
256 google.auth.service_account.Credentials: The constructed
257 credentials.
258 """
259 info, signer = _service_account_info.from_filename(
260 filename, require=["client_email", "token_uri"]
261 )
262 return cls._from_signer_and_info(signer, info, **kwargs)
263
264 @property
265 def service_account_email(self):
266 """The service account email."""
267 return self._service_account_email
268
269 @property
270 def project_id(self):
271 """Project ID associated with this credential."""
272 return self._project_id
273
274 @property
275 def requires_scopes(self):
276 """Checks if the credentials requires scopes.
277
278 Returns:
279 bool: True if there are no scopes set otherwise False.
280 """
281 return True if not self._scopes else False
282
283 def _make_copy(self):
284 cred = self.__class__(
285 self._signer,
286 service_account_email=self._service_account_email,
287 scopes=copy.copy(self._scopes),
288 default_scopes=copy.copy(self._default_scopes),
289 token_uri=self._token_uri,
290 subject=self._subject,
291 project_id=self._project_id,
292 quota_project_id=self._quota_project_id,
293 additional_claims=self._additional_claims.copy(),
294 always_use_jwt_access=self._always_use_jwt_access,
295 universe_domain=self._universe_domain,
296 )
297 return cred
298
299 @_helpers.copy_docstring(credentials.Scoped)
300 def with_scopes(self, scopes, default_scopes=None):
301 cred = self._make_copy()
302 cred._scopes = scopes
303 cred._default_scopes = default_scopes
304 return cred
305
306 def with_always_use_jwt_access(self, always_use_jwt_access):
307 """Create a copy of these credentials with the specified always_use_jwt_access value.
308
309 Args:
310 always_use_jwt_access (bool): Whether always use self signed JWT or not.
311
312 Returns:
313 google.auth.service_account.Credentials: A new credentials
314 instance.
315 Raises:
316 google.auth.exceptions.InvalidValue: If the universe domain is not
317 default and always_use_jwt_access is False.
318 """
319 cred = self._make_copy()
320 if (
321 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
322 and not always_use_jwt_access
323 ):
324 raise exceptions.InvalidValue(
325 "always_use_jwt_access should be True for non-default universe domain"
326 )
327 cred._always_use_jwt_access = always_use_jwt_access
328 return cred
329
330 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
331 def with_universe_domain(self, universe_domain):
332 cred = self._make_copy()
333 cred._universe_domain = universe_domain
334 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
335 cred._always_use_jwt_access = True
336 return cred
337
338 def with_subject(self, subject):
339 """Create a copy of these credentials with the specified subject.
340
341 Args:
342 subject (str): The subject claim.
343
344 Returns:
345 google.auth.service_account.Credentials: A new credentials
346 instance.
347 """
348 cred = self._make_copy()
349 cred._subject = subject
350 return cred
351
352 def with_claims(self, additional_claims):
353 """Returns a copy of these credentials with modified claims.
354
355 Args:
356 additional_claims (Mapping[str, str]): Any additional claims for
357 the JWT payload. This will be merged with the current
358 additional claims.
359
360 Returns:
361 google.auth.service_account.Credentials: A new credentials
362 instance.
363 """
364 new_additional_claims = copy.deepcopy(self._additional_claims)
365 new_additional_claims.update(additional_claims or {})
366 cred = self._make_copy()
367 cred._additional_claims = new_additional_claims
368 return cred
369
370 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
371 def with_quota_project(self, quota_project_id):
372 cred = self._make_copy()
373 cred._quota_project_id = quota_project_id
374 return cred
375
376 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
377 def with_token_uri(self, token_uri):
378 cred = self._make_copy()
379 cred._token_uri = token_uri
380 return cred
381
382 def _make_authorization_grant_assertion(self):
383 """Create the OAuth 2.0 assertion.
384
385 This assertion is used during the OAuth 2.0 grant to acquire an
386 access token.
387
388 Returns:
389 bytes: The authorization grant assertion.
390 """
391 now = _helpers.utcnow()
392 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
393 expiry = now + lifetime
394
395 payload = {
396 "iat": _helpers.datetime_to_secs(now),
397 "exp": _helpers.datetime_to_secs(expiry),
398 # The issuer must be the service account email.
399 "iss": self._service_account_email,
400 # The audience must be the auth token endpoint's URI
401 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
402 "scope": _helpers.scopes_to_string(self._scopes or ()),
403 }
404
405 payload.update(self._additional_claims)
406
407 # The subject can be a user email for domain-wide delegation.
408 if self._subject:
409 payload.setdefault("sub", self._subject)
410
411 token = jwt.encode(self._signer, payload)
412
413 return token
414
415 def _use_self_signed_jwt(self):
416 # Since domain wide delegation doesn't work with self signed JWT. If
417 # subject exists, then we should not use self signed JWT.
418 return self._subject is None and self._jwt_credentials is not None
419
420 def _metric_header_for_usage(self):
421 if self._use_self_signed_jwt():
422 return metrics.CRED_TYPE_SA_JWT
423 return metrics.CRED_TYPE_SA_ASSERTION
424
425 @_helpers.copy_docstring(credentials.Credentials)
426 def refresh(self, request):
427 if self._always_use_jwt_access and not self._jwt_credentials:
428 # If self signed jwt should be used but jwt credential is not
429 # created, try to create one with scopes
430 self._create_self_signed_jwt(None)
431
432 if (
433 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
434 and self._subject
435 ):
436 raise exceptions.RefreshError(
437 "domain wide delegation is not supported for non-default universe domain"
438 )
439
440 if self._use_self_signed_jwt():
441 self._jwt_credentials.refresh(request)
442 self.token = self._jwt_credentials.token.decode()
443 self.expiry = self._jwt_credentials.expiry
444 else:
445 assertion = self._make_authorization_grant_assertion()
446 access_token, expiry, _ = _client.jwt_grant(
447 request, self._token_uri, assertion
448 )
449 self.token = access_token
450 self.expiry = expiry
451
452 def _create_self_signed_jwt(self, audience):
453 """Create a self-signed JWT from the credentials if requirements are met.
454
455 Args:
456 audience (str): The service URL. ``https://[API_ENDPOINT]/``
457 """
458 # https://google.aip.dev/auth/4111
459 if self._always_use_jwt_access:
460 if self._scopes:
461 additional_claims = {"scope": " ".join(self._scopes)}
462 if (
463 self._jwt_credentials is None
464 or self._jwt_credentials.additional_claims != additional_claims
465 ):
466 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
467 self, None, additional_claims=additional_claims
468 )
469 elif audience:
470 if (
471 self._jwt_credentials is None
472 or self._jwt_credentials._audience != audience
473 ):
474
475 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
476 self, audience
477 )
478 elif self._default_scopes:
479 additional_claims = {"scope": " ".join(self._default_scopes)}
480 if (
481 self._jwt_credentials is None
482 or additional_claims != self._jwt_credentials.additional_claims
483 ):
484 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
485 self, None, additional_claims=additional_claims
486 )
487 elif not self._scopes and audience:
488 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
489 self, audience
490 )
491
492 @_helpers.copy_docstring(credentials.Signing)
493 def sign_bytes(self, message):
494 return self._signer.sign(message)
495
496 @property # type: ignore
497 @_helpers.copy_docstring(credentials.Signing)
498 def signer(self):
499 return self._signer
500
501 @property # type: ignore
502 @_helpers.copy_docstring(credentials.Signing)
503 def signer_email(self):
504 return self._service_account_email
505
506
507class IDTokenCredentials(
508 credentials.Signing,
509 credentials.CredentialsWithQuotaProject,
510 credentials.CredentialsWithTokenUri,
511):
512 """Open ID Connect ID Token-based service account credentials.
513
514 These credentials are largely similar to :class:`.Credentials`, but instead
515 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
516 ID Connect ID Token as the bearer token. These credentials are useful when
517 communicating to services that require ID Tokens and can not accept access
518 tokens.
519
520 Usually, you'll create these credentials with one of the helper
521 constructors. To create credentials using a Google service account
522 private key JSON file::
523
524 credentials = (
525 service_account.IDTokenCredentials.from_service_account_file(
526 'service-account.json'))
527
528
529 Or if you already have the service account file loaded::
530
531 service_account_info = json.load(open('service_account.json'))
532 credentials = (
533 service_account.IDTokenCredentials.from_service_account_info(
534 service_account_info))
535
536
537 Both helper methods pass on arguments to the constructor, so you can
538 specify additional scopes and a subject if necessary::
539
540 credentials = (
541 service_account.IDTokenCredentials.from_service_account_file(
542 'service-account.json',
543 scopes=['email'],
544 subject='user@example.com'))
545
546
547 The credentials are considered immutable. If you want to modify the scopes
548 or the subject used for delegation, use :meth:`with_scopes` or
549 :meth:`with_subject`::
550
551 scoped_credentials = credentials.with_scopes(['email'])
552 delegated_credentials = credentials.with_subject(subject)
553
554 """
555
556 def __init__(
557 self,
558 signer,
559 service_account_email,
560 token_uri,
561 target_audience,
562 additional_claims=None,
563 quota_project_id=None,
564 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
565 ):
566 """
567 Args:
568 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
569 service_account_email (str): The service account's email.
570 token_uri (str): The OAuth 2.0 Token URI.
571 target_audience (str): The intended audience for these credentials,
572 used when requesting the ID Token. The ID Token's ``aud`` claim
573 will be set to this string.
574 additional_claims (Mapping[str, str]): Any additional claims for
575 the JWT assertion used in the authorization grant.
576 quota_project_id (Optional[str]): The project ID used for quota and billing.
577 universe_domain (str): The universe domain. The default
578 universe domain is googleapis.com. For default value IAM ID
579 token endponint is used for token refresh. Note that
580 iam.serviceAccountTokenCreator role is required to use the IAM
581 endpoint.
582 .. note:: Typically one of the helper constructors
583 :meth:`from_service_account_file` or
584 :meth:`from_service_account_info` are used instead of calling the
585 constructor directly.
586 """
587 super(IDTokenCredentials, self).__init__()
588 self._signer = signer
589 self._service_account_email = service_account_email
590 self._token_uri = token_uri
591 self._target_audience = target_audience
592 self._quota_project_id = quota_project_id
593 self._use_iam_endpoint = False
594
595 if not universe_domain:
596 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
597 else:
598 self._universe_domain = universe_domain
599 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
600 "googleapis.com", self._universe_domain
601 )
602
603 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
604 self._use_iam_endpoint = True
605
606 if additional_claims is not None:
607 self._additional_claims = additional_claims
608 else:
609 self._additional_claims = {}
610
611 @classmethod
612 def _from_signer_and_info(cls, signer, info, **kwargs):
613 """Creates a credentials instance from a signer and service account
614 info.
615
616 Args:
617 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
618 info (Mapping[str, str]): The service account info.
619 kwargs: Additional arguments to pass to the constructor.
620
621 Returns:
622 google.auth.jwt.IDTokenCredentials: The constructed credentials.
623
624 Raises:
625 ValueError: If the info is not in the expected format.
626 """
627 kwargs.setdefault("service_account_email", info["client_email"])
628 kwargs.setdefault("token_uri", info["token_uri"])
629 if "universe_domain" in info:
630 kwargs["universe_domain"] = info["universe_domain"]
631 return cls(signer, **kwargs)
632
633 @classmethod
634 def from_service_account_info(cls, info, **kwargs):
635 """Creates a credentials instance from parsed service account info.
636
637 Args:
638 info (Mapping[str, str]): The service account info in Google
639 format.
640 kwargs: Additional arguments to pass to the constructor.
641
642 Returns:
643 google.auth.service_account.IDTokenCredentials: The constructed
644 credentials.
645
646 Raises:
647 ValueError: If the info is not in the expected format.
648 """
649 signer = _service_account_info.from_dict(
650 info, require=["client_email", "token_uri"]
651 )
652 return cls._from_signer_and_info(signer, info, **kwargs)
653
654 @classmethod
655 def from_service_account_file(cls, filename, **kwargs):
656 """Creates a credentials instance from a service account json file.
657
658 Args:
659 filename (str): The path to the service account json file.
660 kwargs: Additional arguments to pass to the constructor.
661
662 Returns:
663 google.auth.service_account.IDTokenCredentials: The constructed
664 credentials.
665 """
666 info, signer = _service_account_info.from_filename(
667 filename, require=["client_email", "token_uri"]
668 )
669 return cls._from_signer_and_info(signer, info, **kwargs)
670
671 def _make_copy(self):
672 cred = self.__class__(
673 self._signer,
674 service_account_email=self._service_account_email,
675 token_uri=self._token_uri,
676 target_audience=self._target_audience,
677 additional_claims=self._additional_claims.copy(),
678 quota_project_id=self.quota_project_id,
679 universe_domain=self._universe_domain,
680 )
681 # _use_iam_endpoint is not exposed in the constructor
682 cred._use_iam_endpoint = self._use_iam_endpoint
683 return cred
684
685 def with_target_audience(self, target_audience):
686 """Create a copy of these credentials with the specified target
687 audience.
688
689 Args:
690 target_audience (str): The intended audience for these credentials,
691 used when requesting the ID Token.
692
693 Returns:
694 google.auth.service_account.IDTokenCredentials: A new credentials
695 instance.
696 """
697 cred = self._make_copy()
698 cred._target_audience = target_audience
699 return cred
700
701 def _with_use_iam_endpoint(self, use_iam_endpoint):
702 """Create a copy of these credentials with the use_iam_endpoint value.
703
704 Args:
705 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
706 be used instead of the token_uri. Note that
707 iam.serviceAccountTokenCreator role is required to use the IAM
708 endpoint. The default value is False. This feature is currently
709 experimental and subject to change without notice.
710
711 Returns:
712 google.auth.service_account.IDTokenCredentials: A new credentials
713 instance.
714 Raises:
715 google.auth.exceptions.InvalidValue: If the universe domain is not
716 default and use_iam_endpoint is False.
717 """
718 cred = self._make_copy()
719 if (
720 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
721 and not use_iam_endpoint
722 ):
723 raise exceptions.InvalidValue(
724 "use_iam_endpoint should be True for non-default universe domain"
725 )
726 cred._use_iam_endpoint = use_iam_endpoint
727 return cred
728
729 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
730 def with_quota_project(self, quota_project_id):
731 cred = self._make_copy()
732 cred._quota_project_id = quota_project_id
733 return cred
734
735 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
736 def with_token_uri(self, token_uri):
737 cred = self._make_copy()
738 cred._token_uri = token_uri
739 return cred
740
741 def _make_authorization_grant_assertion(self):
742 """Create the OAuth 2.0 assertion.
743
744 This assertion is used during the OAuth 2.0 grant to acquire an
745 ID token.
746
747 Returns:
748 bytes: The authorization grant assertion.
749 """
750 now = _helpers.utcnow()
751 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
752 expiry = now + lifetime
753
754 payload = {
755 "iat": _helpers.datetime_to_secs(now),
756 "exp": _helpers.datetime_to_secs(expiry),
757 # The issuer must be the service account email.
758 "iss": self.service_account_email,
759 # The audience must be the auth token endpoint's URI
760 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
761 # The target audience specifies which service the ID token is
762 # intended for.
763 "target_audience": self._target_audience,
764 }
765
766 payload.update(self._additional_claims)
767
768 token = jwt.encode(self._signer, payload)
769
770 return token
771
772 def _refresh_with_iam_endpoint(self, request):
773 """Use IAM generateIdToken endpoint to obtain an ID token.
774
775 It works as follows:
776
777 1. First we create a self signed jwt with
778 https://www.googleapis.com/auth/iam being the scope.
779
780 2. Next we use the self signed jwt as the access token, and make a POST
781 request to IAM generateIdToken endpoint. The request body is:
782 {
783 "audience": self._target_audience,
784 "includeEmail": "true",
785 "useEmailAzp": "true",
786 }
787
788 If the request is succesfully, it will return {"token":"the ID token"},
789 and we can extract the ID token and compute its expiry.
790 """
791 jwt_credentials = jwt.Credentials.from_signing_credentials(
792 self,
793 None,
794 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
795 )
796 jwt_credentials.refresh(request)
797 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
798 request,
799 self._iam_id_token_endpoint,
800 self.signer_email,
801 self._target_audience,
802 jwt_credentials.token.decode(),
803 )
804
805 @_helpers.copy_docstring(credentials.Credentials)
806 def refresh(self, request):
807 if self._use_iam_endpoint:
808 self._refresh_with_iam_endpoint(request)
809 else:
810 assertion = self._make_authorization_grant_assertion()
811 access_token, expiry, _ = _client.id_token_jwt_grant(
812 request, self._token_uri, assertion
813 )
814 self.token = access_token
815 self.expiry = expiry
816
817 @property
818 def service_account_email(self):
819 """The service account email."""
820 return self._service_account_email
821
822 @_helpers.copy_docstring(credentials.Signing)
823 def sign_bytes(self, message):
824 return self._signer.sign(message)
825
826 @property # type: ignore
827 @_helpers.copy_docstring(credentials.Signing)
828 def signer(self):
829 return self._signer
830
831 @property # type: ignore
832 @_helpers.copy_docstring(credentials.Signing)
833 def signer_email(self):
834 return self._service_account_email