1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
16
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
21
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
27
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
34
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
38
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
44
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
48
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
53
54Domain-wide delegation
55----------------------
56
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
62
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
66
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
69
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
72
73import copy
74import datetime
75
76from google.auth import _helpers
77from google.auth import _service_account_info
78from google.auth import credentials
79from google.auth import exceptions
80from google.auth import jwt
81from google.auth import metrics
82from google.oauth2 import _client
83
84_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
85_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
86
87
88class Credentials(
89 credentials.Signing,
90 credentials.Scoped,
91 credentials.CredentialsWithQuotaProject,
92 credentials.CredentialsWithTokenUri,
93):
94 """Service account credentials
95
96 Usually, you'll create these credentials with one of the helper
97 constructors. To create credentials using a Google service account
98 private key JSON file::
99
100 credentials = service_account.Credentials.from_service_account_file(
101 'service-account.json')
102
103 Or if you already have the service account file loaded::
104
105 service_account_info = json.load(open('service_account.json'))
106 credentials = service_account.Credentials.from_service_account_info(
107 service_account_info)
108
109 Both helper methods pass on arguments to the constructor, so you can
110 specify additional scopes and a subject if necessary::
111
112 credentials = service_account.Credentials.from_service_account_file(
113 'service-account.json',
114 scopes=['email'],
115 subject='user@example.com')
116
117 The credentials are considered immutable. If you want to modify the scopes
118 or the subject used for delegation, use :meth:`with_scopes` or
119 :meth:`with_subject`::
120
121 scoped_credentials = credentials.with_scopes(['email'])
122 delegated_credentials = credentials.with_subject(subject)
123
124 To add a quota project, use :meth:`with_quota_project`::
125
126 credentials = credentials.with_quota_project('myproject-123')
127 """
128
129 def __init__(
130 self,
131 signer,
132 service_account_email,
133 token_uri,
134 scopes=None,
135 default_scopes=None,
136 subject=None,
137 project_id=None,
138 quota_project_id=None,
139 additional_claims=None,
140 always_use_jwt_access=False,
141 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
142 trust_boundary=None,
143 ):
144 """
145 Args:
146 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
147 service_account_email (str): The service account's email.
148 scopes (Sequence[str]): User-defined scopes to request during the
149 authorization grant.
150 default_scopes (Sequence[str]): Default scopes passed by a
151 Google client library. Use 'scopes' for user-defined scopes.
152 token_uri (str): The OAuth 2.0 Token URI.
153 subject (str): For domain-wide delegation, the email address of the
154 user to for which to request delegated access.
155 project_id (str): Project ID associated with the service account
156 credential.
157 quota_project_id (Optional[str]): The project ID used for quota and
158 billing.
159 additional_claims (Mapping[str, str]): Any additional claims for
160 the JWT assertion used in the authorization grant.
161 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
162 be always used.
163 universe_domain (str): The universe domain. The default
164 universe domain is googleapis.com. For default value self
165 signed jwt is used for token refresh.
166 trust_boundary (str): String representation of trust boundary meta.
167
168 .. note:: Typically one of the helper constructors
169 :meth:`from_service_account_file` or
170 :meth:`from_service_account_info` are used instead of calling the
171 constructor directly.
172 """
173 super(Credentials, self).__init__()
174
175 self._scopes = scopes
176 self._default_scopes = default_scopes
177 self._signer = signer
178 self._service_account_email = service_account_email
179 self._subject = subject
180 self._project_id = project_id
181 self._quota_project_id = quota_project_id
182 self._token_uri = token_uri
183 self._always_use_jwt_access = always_use_jwt_access
184 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
185
186 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
187 self._always_use_jwt_access = True
188
189 self._jwt_credentials = None
190
191 if additional_claims is not None:
192 self._additional_claims = additional_claims
193 else:
194 self._additional_claims = {}
195 self._trust_boundary = {"locations": [], "encoded_locations": "0x0"}
196
197 @classmethod
198 def _from_signer_and_info(cls, signer, info, **kwargs):
199 """Creates a Credentials instance from a signer and service account
200 info.
201
202 Args:
203 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
204 info (Mapping[str, str]): The service account info.
205 kwargs: Additional arguments to pass to the constructor.
206
207 Returns:
208 google.auth.jwt.Credentials: The constructed credentials.
209
210 Raises:
211 ValueError: If the info is not in the expected format.
212 """
213 return cls(
214 signer,
215 service_account_email=info["client_email"],
216 token_uri=info["token_uri"],
217 project_id=info.get("project_id"),
218 universe_domain=info.get(
219 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
220 ),
221 trust_boundary=info.get("trust_boundary"),
222 **kwargs
223 )
224
225 @classmethod
226 def from_service_account_info(cls, info, **kwargs):
227 """Creates a Credentials instance from parsed service account info.
228
229 Args:
230 info (Mapping[str, str]): The service account info in Google
231 format.
232 kwargs: Additional arguments to pass to the constructor.
233
234 Returns:
235 google.auth.service_account.Credentials: The constructed
236 credentials.
237
238 Raises:
239 ValueError: If the info is not in the expected format.
240 """
241 signer = _service_account_info.from_dict(
242 info, require=["client_email", "token_uri"]
243 )
244 return cls._from_signer_and_info(signer, info, **kwargs)
245
246 @classmethod
247 def from_service_account_file(cls, filename, **kwargs):
248 """Creates a Credentials instance from a service account json file.
249
250 Args:
251 filename (str): The path to the service account json file.
252 kwargs: Additional arguments to pass to the constructor.
253
254 Returns:
255 google.auth.service_account.Credentials: The constructed
256 credentials.
257 """
258 info, signer = _service_account_info.from_filename(
259 filename, require=["client_email", "token_uri"]
260 )
261 return cls._from_signer_and_info(signer, info, **kwargs)
262
263 @property
264 def service_account_email(self):
265 """The service account email."""
266 return self._service_account_email
267
268 @property
269 def project_id(self):
270 """Project ID associated with this credential."""
271 return self._project_id
272
273 @property
274 def requires_scopes(self):
275 """Checks if the credentials requires scopes.
276
277 Returns:
278 bool: True if there are no scopes set otherwise False.
279 """
280 return True if not self._scopes else False
281
282 def _make_copy(self):
283 cred = self.__class__(
284 self._signer,
285 service_account_email=self._service_account_email,
286 scopes=copy.copy(self._scopes),
287 default_scopes=copy.copy(self._default_scopes),
288 token_uri=self._token_uri,
289 subject=self._subject,
290 project_id=self._project_id,
291 quota_project_id=self._quota_project_id,
292 additional_claims=self._additional_claims.copy(),
293 always_use_jwt_access=self._always_use_jwt_access,
294 universe_domain=self._universe_domain,
295 )
296 return cred
297
298 @_helpers.copy_docstring(credentials.Scoped)
299 def with_scopes(self, scopes, default_scopes=None):
300 cred = self._make_copy()
301 cred._scopes = scopes
302 cred._default_scopes = default_scopes
303 return cred
304
305 def with_always_use_jwt_access(self, always_use_jwt_access):
306 """Create a copy of these credentials with the specified always_use_jwt_access value.
307
308 Args:
309 always_use_jwt_access (bool): Whether always use self signed JWT or not.
310
311 Returns:
312 google.auth.service_account.Credentials: A new credentials
313 instance.
314 Raises:
315 google.auth.exceptions.InvalidValue: If the universe domain is not
316 default and always_use_jwt_access is False.
317 """
318 cred = self._make_copy()
319 if (
320 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
321 and not always_use_jwt_access
322 ):
323 raise exceptions.InvalidValue(
324 "always_use_jwt_access should be True for non-default universe domain"
325 )
326 cred._always_use_jwt_access = always_use_jwt_access
327 return cred
328
329 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
330 def with_universe_domain(self, universe_domain):
331 cred = self._make_copy()
332 cred._universe_domain = universe_domain
333 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
334 cred._always_use_jwt_access = True
335 return cred
336
337 def with_subject(self, subject):
338 """Create a copy of these credentials with the specified subject.
339
340 Args:
341 subject (str): The subject claim.
342
343 Returns:
344 google.auth.service_account.Credentials: A new credentials
345 instance.
346 """
347 cred = self._make_copy()
348 cred._subject = subject
349 return cred
350
351 def with_claims(self, additional_claims):
352 """Returns a copy of these credentials with modified claims.
353
354 Args:
355 additional_claims (Mapping[str, str]): Any additional claims for
356 the JWT payload. This will be merged with the current
357 additional claims.
358
359 Returns:
360 google.auth.service_account.Credentials: A new credentials
361 instance.
362 """
363 new_additional_claims = copy.deepcopy(self._additional_claims)
364 new_additional_claims.update(additional_claims or {})
365 cred = self._make_copy()
366 cred._additional_claims = new_additional_claims
367 return cred
368
369 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
370 def with_quota_project(self, quota_project_id):
371 cred = self._make_copy()
372 cred._quota_project_id = quota_project_id
373 return cred
374
375 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
376 def with_token_uri(self, token_uri):
377 cred = self._make_copy()
378 cred._token_uri = token_uri
379 return cred
380
381 def _make_authorization_grant_assertion(self):
382 """Create the OAuth 2.0 assertion.
383
384 This assertion is used during the OAuth 2.0 grant to acquire an
385 access token.
386
387 Returns:
388 bytes: The authorization grant assertion.
389 """
390 now = _helpers.utcnow()
391 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
392 expiry = now + lifetime
393
394 payload = {
395 "iat": _helpers.datetime_to_secs(now),
396 "exp": _helpers.datetime_to_secs(expiry),
397 # The issuer must be the service account email.
398 "iss": self._service_account_email,
399 # The audience must be the auth token endpoint's URI
400 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
401 "scope": _helpers.scopes_to_string(self._scopes or ()),
402 }
403
404 payload.update(self._additional_claims)
405
406 # The subject can be a user email for domain-wide delegation.
407 if self._subject:
408 payload.setdefault("sub", self._subject)
409
410 token = jwt.encode(self._signer, payload)
411
412 return token
413
414 def _use_self_signed_jwt(self):
415 # Since domain wide delegation doesn't work with self signed JWT. If
416 # subject exists, then we should not use self signed JWT.
417 return self._subject is None and self._jwt_credentials is not None
418
419 def _metric_header_for_usage(self):
420 if self._use_self_signed_jwt():
421 return metrics.CRED_TYPE_SA_JWT
422 return metrics.CRED_TYPE_SA_ASSERTION
423
424 @_helpers.copy_docstring(credentials.Credentials)
425 def refresh(self, request):
426 if self._always_use_jwt_access and not self._jwt_credentials:
427 # If self signed jwt should be used but jwt credential is not
428 # created, try to create one with scopes
429 self._create_self_signed_jwt(None)
430
431 if (
432 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
433 and self._subject
434 ):
435 raise exceptions.RefreshError(
436 "domain wide delegation is not supported for non-default universe domain"
437 )
438
439 if self._use_self_signed_jwt():
440 self._jwt_credentials.refresh(request)
441 self.token = self._jwt_credentials.token.decode()
442 self.expiry = self._jwt_credentials.expiry
443 else:
444 assertion = self._make_authorization_grant_assertion()
445 access_token, expiry, _ = _client.jwt_grant(
446 request, self._token_uri, assertion
447 )
448 self.token = access_token
449 self.expiry = expiry
450
451 def _create_self_signed_jwt(self, audience):
452 """Create a self-signed JWT from the credentials if requirements are met.
453
454 Args:
455 audience (str): The service URL. ``https://[API_ENDPOINT]/``
456 """
457 # https://google.aip.dev/auth/4111
458 if self._always_use_jwt_access:
459 if self._scopes:
460 additional_claims = {"scope": " ".join(self._scopes)}
461 if (
462 self._jwt_credentials is None
463 or self._jwt_credentials.additional_claims != additional_claims
464 ):
465 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
466 self, None, additional_claims=additional_claims
467 )
468 elif audience:
469 if (
470 self._jwt_credentials is None
471 or self._jwt_credentials._audience != audience
472 ):
473
474 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
475 self, audience
476 )
477 elif self._default_scopes:
478 additional_claims = {"scope": " ".join(self._default_scopes)}
479 if (
480 self._jwt_credentials is None
481 or additional_claims != self._jwt_credentials.additional_claims
482 ):
483 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
484 self, None, additional_claims=additional_claims
485 )
486 elif not self._scopes and audience:
487 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
488 self, audience
489 )
490
491 @_helpers.copy_docstring(credentials.Signing)
492 def sign_bytes(self, message):
493 return self._signer.sign(message)
494
495 @property # type: ignore
496 @_helpers.copy_docstring(credentials.Signing)
497 def signer(self):
498 return self._signer
499
500 @property # type: ignore
501 @_helpers.copy_docstring(credentials.Signing)
502 def signer_email(self):
503 return self._service_account_email
504
505
506class IDTokenCredentials(
507 credentials.Signing,
508 credentials.CredentialsWithQuotaProject,
509 credentials.CredentialsWithTokenUri,
510):
511 """Open ID Connect ID Token-based service account credentials.
512
513 These credentials are largely similar to :class:`.Credentials`, but instead
514 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
515 ID Connect ID Token as the bearer token. These credentials are useful when
516 communicating to services that require ID Tokens and can not accept access
517 tokens.
518
519 Usually, you'll create these credentials with one of the helper
520 constructors. To create credentials using a Google service account
521 private key JSON file::
522
523 credentials = (
524 service_account.IDTokenCredentials.from_service_account_file(
525 'service-account.json'))
526
527
528 Or if you already have the service account file loaded::
529
530 service_account_info = json.load(open('service_account.json'))
531 credentials = (
532 service_account.IDTokenCredentials.from_service_account_info(
533 service_account_info))
534
535
536 Both helper methods pass on arguments to the constructor, so you can
537 specify additional scopes and a subject if necessary::
538
539 credentials = (
540 service_account.IDTokenCredentials.from_service_account_file(
541 'service-account.json',
542 scopes=['email'],
543 subject='user@example.com'))
544
545
546 The credentials are considered immutable. If you want to modify the scopes
547 or the subject used for delegation, use :meth:`with_scopes` or
548 :meth:`with_subject`::
549
550 scoped_credentials = credentials.with_scopes(['email'])
551 delegated_credentials = credentials.with_subject(subject)
552
553 """
554
555 def __init__(
556 self,
557 signer,
558 service_account_email,
559 token_uri,
560 target_audience,
561 additional_claims=None,
562 quota_project_id=None,
563 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
564 ):
565 """
566 Args:
567 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
568 service_account_email (str): The service account's email.
569 token_uri (str): The OAuth 2.0 Token URI.
570 target_audience (str): The intended audience for these credentials,
571 used when requesting the ID Token. The ID Token's ``aud`` claim
572 will be set to this string.
573 additional_claims (Mapping[str, str]): Any additional claims for
574 the JWT assertion used in the authorization grant.
575 quota_project_id (Optional[str]): The project ID used for quota and billing.
576 universe_domain (str): The universe domain. The default
577 universe domain is googleapis.com. For default value IAM ID
578 token endponint is used for token refresh. Note that
579 iam.serviceAccountTokenCreator role is required to use the IAM
580 endpoint.
581 .. note:: Typically one of the helper constructors
582 :meth:`from_service_account_file` or
583 :meth:`from_service_account_info` are used instead of calling the
584 constructor directly.
585 """
586 super(IDTokenCredentials, self).__init__()
587 self._signer = signer
588 self._service_account_email = service_account_email
589 self._token_uri = token_uri
590 self._target_audience = target_audience
591 self._quota_project_id = quota_project_id
592 self._use_iam_endpoint = False
593
594 if not universe_domain:
595 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
596 else:
597 self._universe_domain = universe_domain
598
599 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
600 self._use_iam_endpoint = True
601
602 if additional_claims is not None:
603 self._additional_claims = additional_claims
604 else:
605 self._additional_claims = {}
606
607 @classmethod
608 def _from_signer_and_info(cls, signer, info, **kwargs):
609 """Creates a credentials instance from a signer and service account
610 info.
611
612 Args:
613 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
614 info (Mapping[str, str]): The service account info.
615 kwargs: Additional arguments to pass to the constructor.
616
617 Returns:
618 google.auth.jwt.IDTokenCredentials: The constructed credentials.
619
620 Raises:
621 ValueError: If the info is not in the expected format.
622 """
623 kwargs.setdefault("service_account_email", info["client_email"])
624 kwargs.setdefault("token_uri", info["token_uri"])
625 if "universe_domain" in info:
626 kwargs["universe_domain"] = info["universe_domain"]
627 return cls(signer, **kwargs)
628
629 @classmethod
630 def from_service_account_info(cls, info, **kwargs):
631 """Creates a credentials instance from parsed service account info.
632
633 Args:
634 info (Mapping[str, str]): The service account info in Google
635 format.
636 kwargs: Additional arguments to pass to the constructor.
637
638 Returns:
639 google.auth.service_account.IDTokenCredentials: The constructed
640 credentials.
641
642 Raises:
643 ValueError: If the info is not in the expected format.
644 """
645 signer = _service_account_info.from_dict(
646 info, require=["client_email", "token_uri"]
647 )
648 return cls._from_signer_and_info(signer, info, **kwargs)
649
650 @classmethod
651 def from_service_account_file(cls, filename, **kwargs):
652 """Creates a credentials instance from a service account json file.
653
654 Args:
655 filename (str): The path to the service account json file.
656 kwargs: Additional arguments to pass to the constructor.
657
658 Returns:
659 google.auth.service_account.IDTokenCredentials: The constructed
660 credentials.
661 """
662 info, signer = _service_account_info.from_filename(
663 filename, require=["client_email", "token_uri"]
664 )
665 return cls._from_signer_and_info(signer, info, **kwargs)
666
667 def _make_copy(self):
668 cred = self.__class__(
669 self._signer,
670 service_account_email=self._service_account_email,
671 token_uri=self._token_uri,
672 target_audience=self._target_audience,
673 additional_claims=self._additional_claims.copy(),
674 quota_project_id=self.quota_project_id,
675 universe_domain=self._universe_domain,
676 )
677 # _use_iam_endpoint is not exposed in the constructor
678 cred._use_iam_endpoint = self._use_iam_endpoint
679 return cred
680
681 def with_target_audience(self, target_audience):
682 """Create a copy of these credentials with the specified target
683 audience.
684
685 Args:
686 target_audience (str): The intended audience for these credentials,
687 used when requesting the ID Token.
688
689 Returns:
690 google.auth.service_account.IDTokenCredentials: A new credentials
691 instance.
692 """
693 cred = self._make_copy()
694 cred._target_audience = target_audience
695 return cred
696
697 def _with_use_iam_endpoint(self, use_iam_endpoint):
698 """Create a copy of these credentials with the use_iam_endpoint value.
699
700 Args:
701 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
702 be used instead of the token_uri. Note that
703 iam.serviceAccountTokenCreator role is required to use the IAM
704 endpoint. The default value is False. This feature is currently
705 experimental and subject to change without notice.
706
707 Returns:
708 google.auth.service_account.IDTokenCredentials: A new credentials
709 instance.
710 Raises:
711 google.auth.exceptions.InvalidValue: If the universe domain is not
712 default and use_iam_endpoint is False.
713 """
714 cred = self._make_copy()
715 if (
716 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
717 and not use_iam_endpoint
718 ):
719 raise exceptions.InvalidValue(
720 "use_iam_endpoint should be True for non-default universe domain"
721 )
722 cred._use_iam_endpoint = use_iam_endpoint
723 return cred
724
725 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
726 def with_quota_project(self, quota_project_id):
727 cred = self._make_copy()
728 cred._quota_project_id = quota_project_id
729 return cred
730
731 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
732 def with_token_uri(self, token_uri):
733 cred = self._make_copy()
734 cred._token_uri = token_uri
735 return cred
736
737 def _make_authorization_grant_assertion(self):
738 """Create the OAuth 2.0 assertion.
739
740 This assertion is used during the OAuth 2.0 grant to acquire an
741 ID token.
742
743 Returns:
744 bytes: The authorization grant assertion.
745 """
746 now = _helpers.utcnow()
747 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
748 expiry = now + lifetime
749
750 payload = {
751 "iat": _helpers.datetime_to_secs(now),
752 "exp": _helpers.datetime_to_secs(expiry),
753 # The issuer must be the service account email.
754 "iss": self.service_account_email,
755 # The audience must be the auth token endpoint's URI
756 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
757 # The target audience specifies which service the ID token is
758 # intended for.
759 "target_audience": self._target_audience,
760 }
761
762 payload.update(self._additional_claims)
763
764 token = jwt.encode(self._signer, payload)
765
766 return token
767
768 def _refresh_with_iam_endpoint(self, request):
769 """Use IAM generateIdToken endpoint to obtain an ID token.
770
771 It works as follows:
772
773 1. First we create a self signed jwt with
774 https://www.googleapis.com/auth/iam being the scope.
775
776 2. Next we use the self signed jwt as the access token, and make a POST
777 request to IAM generateIdToken endpoint. The request body is:
778 {
779 "audience": self._target_audience,
780 "includeEmail": "true",
781 "useEmailAzp": "true",
782 }
783
784 If the request is succesfully, it will return {"token":"the ID token"},
785 and we can extract the ID token and compute its expiry.
786 """
787 jwt_credentials = jwt.Credentials.from_signing_credentials(
788 self,
789 None,
790 additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
791 )
792 jwt_credentials.refresh(request)
793 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
794 request,
795 self.signer_email,
796 self._target_audience,
797 jwt_credentials.token.decode(),
798 )
799
800 @_helpers.copy_docstring(credentials.Credentials)
801 def refresh(self, request):
802 if self._use_iam_endpoint:
803 self._refresh_with_iam_endpoint(request)
804 else:
805 assertion = self._make_authorization_grant_assertion()
806 access_token, expiry, _ = _client.id_token_jwt_grant(
807 request, self._token_uri, assertion
808 )
809 self.token = access_token
810 self.expiry = expiry
811
812 @property
813 def service_account_email(self):
814 """The service account email."""
815 return self._service_account_email
816
817 @_helpers.copy_docstring(credentials.Signing)
818 def sign_bytes(self, message):
819 return self._signer.sign(message)
820
821 @property # type: ignore
822 @_helpers.copy_docstring(credentials.Signing)
823 def signer(self):
824 return self._signer
825
826 @property # type: ignore
827 @_helpers.copy_docstring(credentials.Signing)
828 def signer_email(self):
829 return self._service_account_email