Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 32%
237 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""JSON Web Tokens
17Provides support for creating (encoding) and verifying (decoding) JWTs,
18especially JWTs generated and consumed by Google infrastructure.
20See `rfc7519`_ for more details on JWTs.
22To encode a JWT use :func:`encode`::
24 from google.auth import crypt
25 from google.auth import jwt
27 signer = crypt.Signer(private_key)
28 payload = {'some': 'payload'}
29 encoded = jwt.encode(signer, payload)
31To decode a JWT and verify claims use :func:`decode`::
33 claims = jwt.decode(encoded, certs=public_certs)
35You can also skip verification::
37 claims = jwt.decode(encoded, verify=False)
39.. _rfc7519: https://tools.ietf.org/html/rfc7519
41"""
43try:
44 from collections.abc import Mapping
45# Python 2.7 compatibility
46except ImportError: # pragma: NO COVER
47 from collections import Mapping # type: ignore
48import copy
49import datetime
50import json
51import urllib
53import cachetools
55from google.auth import _helpers
56from google.auth import _service_account_info
57from google.auth import crypt
58from google.auth import exceptions
59import google.auth.credentials
61try:
62 from google.auth.crypt import es256
63except ImportError: # pragma: NO COVER
64 es256 = None # type: ignore
66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
67_DEFAULT_MAX_CACHE_SIZE = 10
68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"])
71if es256 is not None: # pragma: NO COVER
72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore
75def encode(signer, payload, header=None, key_id=None):
76 """Make a signed JWT.
78 Args:
79 signer (google.auth.crypt.Signer): The signer used to sign the JWT.
80 payload (Mapping[str, str]): The JWT payload.
81 header (Mapping[str, str]): Additional JWT header payload.
82 key_id (str): The key id to add to the JWT header. If the
83 signer has a key id it will be used as the default. If this is
84 specified it will override the signer's key id.
86 Returns:
87 bytes: The encoded JWT.
88 """
89 if header is None:
90 header = {}
92 if key_id is None:
93 key_id = signer.key_id
95 header.update({"typ": "JWT"})
97 if "alg" not in header:
98 if es256 is not None and isinstance(signer, es256.ES256Signer):
99 header.update({"alg": "ES256"})
100 else:
101 header.update({"alg": "RS256"})
103 if key_id is not None:
104 header["kid"] = key_id
106 segments = [
107 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
108 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
109 ]
111 signing_input = b".".join(segments)
112 signature = signer.sign(signing_input)
113 segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
115 return b".".join(segments)
118def _decode_jwt_segment(encoded_section):
119 """Decodes a single JWT segment."""
120 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
121 try:
122 return json.loads(section_bytes.decode("utf-8"))
123 except ValueError as caught_exc:
124 new_exc = exceptions.MalformedError(
125 "Can't parse segment: {0}".format(section_bytes)
126 )
127 raise new_exc from caught_exc
130def _unverified_decode(token):
131 """Decodes a token and does no verification.
133 Args:
134 token (Union[str, bytes]): The encoded JWT.
136 Returns:
137 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and
138 signature.
140 Raises:
141 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type.
142 """
143 token = _helpers.to_bytes(token)
145 if token.count(b".") != 2:
146 raise exceptions.MalformedError(
147 "Wrong number of segments in token: {0}".format(token)
148 )
150 encoded_header, encoded_payload, signature = token.split(b".")
151 signed_section = encoded_header + b"." + encoded_payload
152 signature = _helpers.padded_urlsafe_b64decode(signature)
154 # Parse segments
155 header = _decode_jwt_segment(encoded_header)
156 payload = _decode_jwt_segment(encoded_payload)
158 if not isinstance(header, Mapping):
159 raise exceptions.MalformedError(
160 "Header segment should be a JSON object: {0}".format(encoded_header)
161 )
163 if not isinstance(payload, Mapping):
164 raise exceptions.MalformedError(
165 "Payload segment should be a JSON object: {0}".format(encoded_payload)
166 )
168 return header, payload, signed_section, signature
171def decode_header(token):
172 """Return the decoded header of a token.
174 No verification is done. This is useful to extract the key id from
175 the header in order to acquire the appropriate certificate to verify
176 the token.
178 Args:
179 token (Union[str, bytes]): the encoded JWT.
181 Returns:
182 Mapping: The decoded JWT header.
183 """
184 header, _, _, _ = _unverified_decode(token)
185 return header
188def _verify_iat_and_exp(payload, clock_skew_in_seconds=0):
189 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token
190 payload.
192 Args:
193 payload (Mapping[str, str]): The JWT payload.
194 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
195 validation.
197 Raises:
198 google.auth.exceptions.InvalidValue: if value validation failed.
199 google.auth.exceptions.MalformedError: if schema validation failed.
200 """
201 now = _helpers.datetime_to_secs(_helpers.utcnow())
203 # Make sure the iat and exp claims are present.
204 for key in ("iat", "exp"):
205 if key not in payload:
206 raise exceptions.MalformedError(
207 "Token does not contain required claim {}".format(key)
208 )
210 # Make sure the token wasn't issued in the future.
211 iat = payload["iat"]
212 # Err on the side of accepting a token that is slightly early to account
213 # for clock skew.
214 earliest = iat - clock_skew_in_seconds
215 if now < earliest:
216 raise exceptions.InvalidValue(
217 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format(
218 now, iat
219 )
220 )
222 # Make sure the token wasn't issued in the past.
223 exp = payload["exp"]
224 # Err on the side of accepting a token that is slightly out of date
225 # to account for clow skew.
226 latest = exp + clock_skew_in_seconds
227 if latest < now:
228 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now))
231def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0):
232 """Decode and verify a JWT.
234 Args:
235 token (str): The encoded JWT.
236 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The
237 certificate used to validate the JWT signature. If bytes or string,
238 it must the the public key certificate in PEM format. If a mapping,
239 it must be a mapping of key IDs to public key certificates in PEM
240 format. The mapping must contain the same key ID that's specified
241 in the token's header.
242 verify (bool): Whether to perform signature and claim validation.
243 Verification is done by default.
244 audience (str or list): The audience claim, 'aud', that this JWT should
245 contain. Or a list of audience claims. If None then the JWT's 'aud'
246 parameter is not verified.
247 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
248 validation.
250 Returns:
251 Mapping[str, str]: The deserialized JSON payload in the JWT.
253 Raises:
254 google.auth.exceptions.InvalidValue: if value validation failed.
255 google.auth.exceptions.MalformedError: if schema validation failed.
256 """
257 header, payload, signed_section, signature = _unverified_decode(token)
259 if not verify:
260 return payload
262 # Pluck the key id and algorithm from the header and make sure we have
263 # a verifier that can support it.
264 key_alg = header.get("alg")
265 key_id = header.get("kid")
267 try:
268 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
269 except KeyError as exc:
270 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
271 raise exceptions.InvalidValue(
272 "The key algorithm {} requires the cryptography package to be installed.".format(
273 key_alg
274 )
275 ) from exc
276 else:
277 raise exceptions.InvalidValue(
278 "Unsupported signature algorithm {}".format(key_alg)
279 ) from exc
280 # If certs is specified as a dictionary of key IDs to certificates, then
281 # use the certificate identified by the key ID in the token header.
282 if isinstance(certs, Mapping):
283 if key_id:
284 if key_id not in certs:
285 raise exceptions.MalformedError(
286 "Certificate for key id {} not found.".format(key_id)
287 )
288 certs_to_check = [certs[key_id]]
289 # If there's no key id in the header, check against all of the certs.
290 else:
291 certs_to_check = certs.values()
292 else:
293 certs_to_check = certs
295 # Verify that the signature matches the message.
296 if not crypt.verify_signature(
297 signed_section, signature, certs_to_check, verifier_cls
298 ):
299 raise exceptions.MalformedError("Could not verify token signature.")
301 # Verify the issued at and created times in the payload.
302 _verify_iat_and_exp(payload, clock_skew_in_seconds)
304 # Check audience.
305 if audience is not None:
306 claim_audience = payload.get("aud")
307 if isinstance(audience, str):
308 audience = [audience]
309 if claim_audience not in audience:
310 raise exceptions.InvalidValue(
311 "Token has wrong audience {}, expected one of {}".format(
312 claim_audience, audience
313 )
314 )
316 return payload
319class Credentials(
320 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
321):
322 """Credentials that use a JWT as the bearer token.
324 These credentials require an "audience" claim. This claim identifies the
325 intended recipient of the bearer token.
327 The constructor arguments determine the claims for the JWT that is
328 sent with requests. Usually, you'll construct these credentials with
329 one of the helper constructors as shown in the next section.
331 To create JWT credentials using a Google service account private key
332 JSON file::
334 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
335 credentials = jwt.Credentials.from_service_account_file(
336 'service-account.json',
337 audience=audience)
339 If you already have the service account file loaded and parsed::
341 service_account_info = json.load(open('service_account.json'))
342 credentials = jwt.Credentials.from_service_account_info(
343 service_account_info,
344 audience=audience)
346 Both helper methods pass on arguments to the constructor, so you can
347 specify the JWT claims::
349 credentials = jwt.Credentials.from_service_account_file(
350 'service-account.json',
351 audience=audience,
352 additional_claims={'meta': 'data'})
354 You can also construct the credentials directly if you have a
355 :class:`~google.auth.crypt.Signer` instance::
357 credentials = jwt.Credentials(
358 signer,
359 issuer='your-issuer',
360 subject='your-subject',
361 audience=audience)
363 The claims are considered immutable. If you want to modify the claims,
364 you can easily create another instance using :meth:`with_claims`::
366 new_audience = (
367 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber')
368 new_credentials = credentials.with_claims(audience=new_audience)
369 """
371 def __init__(
372 self,
373 signer,
374 issuer,
375 subject,
376 audience,
377 additional_claims=None,
378 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
379 quota_project_id=None,
380 ):
381 """
382 Args:
383 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
384 issuer (str): The `iss` claim.
385 subject (str): The `sub` claim.
386 audience (str): the `aud` claim. The intended audience for the
387 credentials.
388 additional_claims (Mapping[str, str]): Any additional claims for
389 the JWT payload.
390 token_lifetime (int): The amount of time in seconds for
391 which the token is valid. Defaults to 1 hour.
392 quota_project_id (Optional[str]): The project ID used for quota
393 and billing.
394 """
395 super(Credentials, self).__init__()
396 self._signer = signer
397 self._issuer = issuer
398 self._subject = subject
399 self._audience = audience
400 self._token_lifetime = token_lifetime
401 self._quota_project_id = quota_project_id
403 if additional_claims is None:
404 additional_claims = {}
406 self._additional_claims = additional_claims
408 @classmethod
409 def _from_signer_and_info(cls, signer, info, **kwargs):
410 """Creates a Credentials instance from a signer and service account
411 info.
413 Args:
414 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
415 info (Mapping[str, str]): The service account info.
416 kwargs: Additional arguments to pass to the constructor.
418 Returns:
419 google.auth.jwt.Credentials: The constructed credentials.
421 Raises:
422 google.auth.exceptions.MalformedError: If the info is not in the expected format.
423 """
424 kwargs.setdefault("subject", info["client_email"])
425 kwargs.setdefault("issuer", info["client_email"])
426 return cls(signer, **kwargs)
428 @classmethod
429 def from_service_account_info(cls, info, **kwargs):
430 """Creates an Credentials instance from a dictionary.
432 Args:
433 info (Mapping[str, str]): The service account info in Google
434 format.
435 kwargs: Additional arguments to pass to the constructor.
437 Returns:
438 google.auth.jwt.Credentials: The constructed credentials.
440 Raises:
441 google.auth.exceptions.MalformedError: If the info is not in the expected format.
442 """
443 signer = _service_account_info.from_dict(info, require=["client_email"])
444 return cls._from_signer_and_info(signer, info, **kwargs)
446 @classmethod
447 def from_service_account_file(cls, filename, **kwargs):
448 """Creates a Credentials instance from a service account .json file
449 in Google format.
451 Args:
452 filename (str): The path to the service account .json file.
453 kwargs: Additional arguments to pass to the constructor.
455 Returns:
456 google.auth.jwt.Credentials: The constructed credentials.
457 """
458 info, signer = _service_account_info.from_filename(
459 filename, require=["client_email"]
460 )
461 return cls._from_signer_and_info(signer, info, **kwargs)
463 @classmethod
464 def from_signing_credentials(cls, credentials, audience, **kwargs):
465 """Creates a new :class:`google.auth.jwt.Credentials` instance from an
466 existing :class:`google.auth.credentials.Signing` instance.
468 The new instance will use the same signer as the existing instance and
469 will use the existing instance's signer email as the issuer and
470 subject by default.
472 Example::
474 svc_creds = service_account.Credentials.from_service_account_file(
475 'service_account.json')
476 audience = (
477 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher')
478 jwt_creds = jwt.Credentials.from_signing_credentials(
479 svc_creds, audience=audience)
481 Args:
482 credentials (google.auth.credentials.Signing): The credentials to
483 use to construct the new credentials.
484 audience (str): the `aud` claim. The intended audience for the
485 credentials.
486 kwargs: Additional arguments to pass to the constructor.
488 Returns:
489 google.auth.jwt.Credentials: A new Credentials instance.
490 """
491 kwargs.setdefault("issuer", credentials.signer_email)
492 kwargs.setdefault("subject", credentials.signer_email)
493 return cls(credentials.signer, audience=audience, **kwargs)
495 def with_claims(
496 self, issuer=None, subject=None, audience=None, additional_claims=None
497 ):
498 """Returns a copy of these credentials with modified claims.
500 Args:
501 issuer (str): The `iss` claim. If unspecified the current issuer
502 claim will be used.
503 subject (str): The `sub` claim. If unspecified the current subject
504 claim will be used.
505 audience (str): the `aud` claim. If unspecified the current
506 audience claim will be used.
507 additional_claims (Mapping[str, str]): Any additional claims for
508 the JWT payload. This will be merged with the current
509 additional claims.
511 Returns:
512 google.auth.jwt.Credentials: A new credentials instance.
513 """
514 new_additional_claims = copy.deepcopy(self._additional_claims)
515 new_additional_claims.update(additional_claims or {})
517 return self.__class__(
518 self._signer,
519 issuer=issuer if issuer is not None else self._issuer,
520 subject=subject if subject is not None else self._subject,
521 audience=audience if audience is not None else self._audience,
522 additional_claims=new_additional_claims,
523 quota_project_id=self._quota_project_id,
524 )
526 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
527 def with_quota_project(self, quota_project_id):
528 return self.__class__(
529 self._signer,
530 issuer=self._issuer,
531 subject=self._subject,
532 audience=self._audience,
533 additional_claims=self._additional_claims,
534 quota_project_id=quota_project_id,
535 )
537 def _make_jwt(self):
538 """Make a signed JWT.
540 Returns:
541 Tuple[bytes, datetime]: The encoded JWT and the expiration.
542 """
543 now = _helpers.utcnow()
544 lifetime = datetime.timedelta(seconds=self._token_lifetime)
545 expiry = now + lifetime
547 payload = {
548 "iss": self._issuer,
549 "sub": self._subject,
550 "iat": _helpers.datetime_to_secs(now),
551 "exp": _helpers.datetime_to_secs(expiry),
552 }
553 if self._audience:
554 payload["aud"] = self._audience
556 payload.update(self._additional_claims)
558 jwt = encode(self._signer, payload)
560 return jwt, expiry
562 def refresh(self, request):
563 """Refreshes the access token.
565 Args:
566 request (Any): Unused.
567 """
568 # pylint: disable=unused-argument
569 # (pylint doesn't correctly recognize overridden methods.)
570 self.token, self.expiry = self._make_jwt()
572 @_helpers.copy_docstring(google.auth.credentials.Signing)
573 def sign_bytes(self, message):
574 return self._signer.sign(message)
576 @property # type: ignore
577 @_helpers.copy_docstring(google.auth.credentials.Signing)
578 def signer_email(self):
579 return self._issuer
581 @property # type: ignore
582 @_helpers.copy_docstring(google.auth.credentials.Signing)
583 def signer(self):
584 return self._signer
586 @property # type: ignore
587 def additional_claims(self):
588 """ Additional claims the JWT object was created with."""
589 return self._additional_claims
592class OnDemandCredentials(
593 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
594):
595 """On-demand JWT credentials.
597 Like :class:`Credentials`, this class uses a JWT as the bearer token for
598 authentication. However, this class does not require the audience at
599 construction time. Instead, it will generate a new token on-demand for
600 each request using the request URI as the audience. It caches tokens
601 so that multiple requests to the same URI do not incur the overhead
602 of generating a new token every time.
604 This behavior is especially useful for `gRPC`_ clients. A gRPC service may
605 have multiple audience and gRPC clients may not know all of the audiences
606 required for accessing a particular service. With these credentials,
607 no knowledge of the audiences is required ahead of time.
609 .. _grpc: http://www.grpc.io/
610 """
612 def __init__(
613 self,
614 signer,
615 issuer,
616 subject,
617 additional_claims=None,
618 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
619 max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
620 quota_project_id=None,
621 ):
622 """
623 Args:
624 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
625 issuer (str): The `iss` claim.
626 subject (str): The `sub` claim.
627 additional_claims (Mapping[str, str]): Any additional claims for
628 the JWT payload.
629 token_lifetime (int): The amount of time in seconds for
630 which the token is valid. Defaults to 1 hour.
631 max_cache_size (int): The maximum number of JWT tokens to keep in
632 cache. Tokens are cached using :class:`cachetools.LRUCache`.
633 quota_project_id (Optional[str]): The project ID used for quota
634 and billing.
636 """
637 super(OnDemandCredentials, self).__init__()
638 self._signer = signer
639 self._issuer = issuer
640 self._subject = subject
641 self._token_lifetime = token_lifetime
642 self._quota_project_id = quota_project_id
644 if additional_claims is None:
645 additional_claims = {}
647 self._additional_claims = additional_claims
648 self._cache = cachetools.LRUCache(maxsize=max_cache_size)
650 @classmethod
651 def _from_signer_and_info(cls, signer, info, **kwargs):
652 """Creates an OnDemandCredentials instance from a signer and service
653 account info.
655 Args:
656 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
657 info (Mapping[str, str]): The service account info.
658 kwargs: Additional arguments to pass to the constructor.
660 Returns:
661 google.auth.jwt.OnDemandCredentials: The constructed credentials.
663 Raises:
664 google.auth.exceptions.MalformedError: If the info is not in the expected format.
665 """
666 kwargs.setdefault("subject", info["client_email"])
667 kwargs.setdefault("issuer", info["client_email"])
668 return cls(signer, **kwargs)
670 @classmethod
671 def from_service_account_info(cls, info, **kwargs):
672 """Creates an OnDemandCredentials instance from a dictionary.
674 Args:
675 info (Mapping[str, str]): The service account info in Google
676 format.
677 kwargs: Additional arguments to pass to the constructor.
679 Returns:
680 google.auth.jwt.OnDemandCredentials: The constructed credentials.
682 Raises:
683 google.auth.exceptions.MalformedError: If the info is not in the expected format.
684 """
685 signer = _service_account_info.from_dict(info, require=["client_email"])
686 return cls._from_signer_and_info(signer, info, **kwargs)
688 @classmethod
689 def from_service_account_file(cls, filename, **kwargs):
690 """Creates an OnDemandCredentials instance from a service account .json
691 file in Google format.
693 Args:
694 filename (str): The path to the service account .json file.
695 kwargs: Additional arguments to pass to the constructor.
697 Returns:
698 google.auth.jwt.OnDemandCredentials: The constructed credentials.
699 """
700 info, signer = _service_account_info.from_filename(
701 filename, require=["client_email"]
702 )
703 return cls._from_signer_and_info(signer, info, **kwargs)
705 @classmethod
706 def from_signing_credentials(cls, credentials, **kwargs):
707 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance
708 from an existing :class:`google.auth.credentials.Signing` instance.
710 The new instance will use the same signer as the existing instance and
711 will use the existing instance's signer email as the issuer and
712 subject by default.
714 Example::
716 svc_creds = service_account.Credentials.from_service_account_file(
717 'service_account.json')
718 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials(
719 svc_creds)
721 Args:
722 credentials (google.auth.credentials.Signing): The credentials to
723 use to construct the new credentials.
724 kwargs: Additional arguments to pass to the constructor.
726 Returns:
727 google.auth.jwt.Credentials: A new Credentials instance.
728 """
729 kwargs.setdefault("issuer", credentials.signer_email)
730 kwargs.setdefault("subject", credentials.signer_email)
731 return cls(credentials.signer, **kwargs)
733 def with_claims(self, issuer=None, subject=None, additional_claims=None):
734 """Returns a copy of these credentials with modified claims.
736 Args:
737 issuer (str): The `iss` claim. If unspecified the current issuer
738 claim will be used.
739 subject (str): The `sub` claim. If unspecified the current subject
740 claim will be used.
741 additional_claims (Mapping[str, str]): Any additional claims for
742 the JWT payload. This will be merged with the current
743 additional claims.
745 Returns:
746 google.auth.jwt.OnDemandCredentials: A new credentials instance.
747 """
748 new_additional_claims = copy.deepcopy(self._additional_claims)
749 new_additional_claims.update(additional_claims or {})
751 return self.__class__(
752 self._signer,
753 issuer=issuer if issuer is not None else self._issuer,
754 subject=subject if subject is not None else self._subject,
755 additional_claims=new_additional_claims,
756 max_cache_size=self._cache.maxsize,
757 quota_project_id=self._quota_project_id,
758 )
760 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
761 def with_quota_project(self, quota_project_id):
763 return self.__class__(
764 self._signer,
765 issuer=self._issuer,
766 subject=self._subject,
767 additional_claims=self._additional_claims,
768 max_cache_size=self._cache.maxsize,
769 quota_project_id=quota_project_id,
770 )
772 @property
773 def valid(self):
774 """Checks the validity of the credentials.
776 These credentials are always valid because it generates tokens on
777 demand.
778 """
779 return True
781 def _make_jwt_for_audience(self, audience):
782 """Make a new JWT for the given audience.
784 Args:
785 audience (str): The intended audience.
787 Returns:
788 Tuple[bytes, datetime]: The encoded JWT and the expiration.
789 """
790 now = _helpers.utcnow()
791 lifetime = datetime.timedelta(seconds=self._token_lifetime)
792 expiry = now + lifetime
794 payload = {
795 "iss": self._issuer,
796 "sub": self._subject,
797 "iat": _helpers.datetime_to_secs(now),
798 "exp": _helpers.datetime_to_secs(expiry),
799 "aud": audience,
800 }
802 payload.update(self._additional_claims)
804 jwt = encode(self._signer, payload)
806 return jwt, expiry
808 def _get_jwt_for_audience(self, audience):
809 """Get a JWT For a given audience.
811 If there is already an existing, non-expired token in the cache for
812 the audience, that token is used. Otherwise, a new token will be
813 created.
815 Args:
816 audience (str): The intended audience.
818 Returns:
819 bytes: The encoded JWT.
820 """
821 token, expiry = self._cache.get(audience, (None, None))
823 if token is None or expiry < _helpers.utcnow():
824 token, expiry = self._make_jwt_for_audience(audience)
825 self._cache[audience] = token, expiry
827 return token
829 def refresh(self, request):
830 """Raises an exception, these credentials can not be directly
831 refreshed.
833 Args:
834 request (Any): Unused.
836 Raises:
837 google.auth.RefreshError
838 """
839 # pylint: disable=unused-argument
840 # (pylint doesn't correctly recognize overridden methods.)
841 raise exceptions.RefreshError(
842 "OnDemandCredentials can not be directly refreshed."
843 )
845 def before_request(self, request, method, url, headers):
846 """Performs credential-specific before request logic.
848 Args:
849 request (Any): Unused. JWT credentials do not need to make an
850 HTTP request to refresh.
851 method (str): The request's HTTP method.
852 url (str): The request's URI. This is used as the audience claim
853 when generating the JWT.
854 headers (Mapping): The request's headers.
855 """
856 # pylint: disable=unused-argument
857 # (pylint doesn't correctly recognize overridden methods.)
858 parts = urllib.parse.urlsplit(url)
859 # Strip query string and fragment
860 audience = urllib.parse.urlunsplit(
861 (parts.scheme, parts.netloc, parts.path, "", "")
862 )
863 token = self._get_jwt_for_audience(audience)
864 self.apply(headers, token=token)
866 @_helpers.copy_docstring(google.auth.credentials.Signing)
867 def sign_bytes(self, message):
868 return self._signer.sign(message)
870 @property # type: ignore
871 @_helpers.copy_docstring(google.auth.credentials.Signing)
872 def signer_email(self):
873 return self._issuer
875 @property # type: ignore
876 @_helpers.copy_docstring(google.auth.credentials.Signing)
877 def signer(self):
878 return self._signer