Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 32%
235 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""JSON Web Tokens
17Provides support for creating (encoding) and verifying (decoding) JWTs,
18especially JWTs generated and consumed by Google infrastructure.
20See `rfc7519`_ for more details on JWTs.
22To encode a JWT use :func:`encode`::
24 from google.auth import crypt
25 from google.auth import jwt
27 signer = crypt.Signer(private_key)
28 payload = {'some': 'payload'}
29 encoded = jwt.encode(signer, payload)
31To decode a JWT and verify claims use :func:`decode`::
33 claims = jwt.decode(encoded, certs=public_certs)
35You can also skip verification::
37 claims = jwt.decode(encoded, verify=False)
39.. _rfc7519: https://tools.ietf.org/html/rfc7519
41"""
43try:
44 from collections.abc import Mapping
45# Python 2.7 compatibility
46except ImportError: # pragma: NO COVER
47 from collections import Mapping # type: ignore
48import copy
49import datetime
50import json
52import cachetools
53import six
54from six.moves import urllib
56from google.auth import _helpers
57from google.auth import _service_account_info
58from google.auth import crypt
59from google.auth import exceptions
60import google.auth.credentials
62try:
63 from google.auth.crypt import es256
64except ImportError: # pragma: NO COVER
65 es256 = None # type: ignore
67_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
68_DEFAULT_MAX_CACHE_SIZE = 10
69_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
70_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"])
72if es256 is not None: # pragma: NO COVER
73 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore
76def encode(signer, payload, header=None, key_id=None):
77 """Make a signed JWT.
79 Args:
80 signer (google.auth.crypt.Signer): The signer used to sign the JWT.
81 payload (Mapping[str, str]): The JWT payload.
82 header (Mapping[str, str]): Additional JWT header payload.
83 key_id (str): The key id to add to the JWT header. If the
84 signer has a key id it will be used as the default. If this is
85 specified it will override the signer's key id.
87 Returns:
88 bytes: The encoded JWT.
89 """
90 if header is None:
91 header = {}
93 if key_id is None:
94 key_id = signer.key_id
96 header.update({"typ": "JWT"})
98 if "alg" not in header:
99 if es256 is not None and isinstance(signer, es256.ES256Signer):
100 header.update({"alg": "ES256"})
101 else:
102 header.update({"alg": "RS256"})
104 if key_id is not None:
105 header["kid"] = key_id
107 segments = [
108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
110 ]
112 signing_input = b".".join(segments)
113 signature = signer.sign(signing_input)
114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
116 return b".".join(segments)
119def _decode_jwt_segment(encoded_section):
120 """Decodes a single JWT segment."""
121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
122 try:
123 return json.loads(section_bytes.decode("utf-8"))
124 except ValueError as caught_exc:
125 new_exc = exceptions.MalformedError(
126 "Can't parse segment: {0}".format(section_bytes)
127 )
128 six.raise_from(new_exc, caught_exc)
131def _unverified_decode(token):
132 """Decodes a token and does no verification.
134 Args:
135 token (Union[str, bytes]): The encoded JWT.
137 Returns:
138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and
139 signature.
141 Raises:
142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type.
143 """
144 token = _helpers.to_bytes(token)
146 if token.count(b".") != 2:
147 raise exceptions.MalformedError(
148 "Wrong number of segments in token: {0}".format(token)
149 )
151 encoded_header, encoded_payload, signature = token.split(b".")
152 signed_section = encoded_header + b"." + encoded_payload
153 signature = _helpers.padded_urlsafe_b64decode(signature)
155 # Parse segments
156 header = _decode_jwt_segment(encoded_header)
157 payload = _decode_jwt_segment(encoded_payload)
159 if not isinstance(header, Mapping):
160 raise exceptions.MalformedError(
161 "Header segment should be a JSON object: {0}".format(encoded_header)
162 )
164 if not isinstance(payload, Mapping):
165 raise exceptions.MalformedError(
166 "Payload segment should be a JSON object: {0}".format(encoded_payload)
167 )
169 return header, payload, signed_section, signature
172def decode_header(token):
173 """Return the decoded header of a token.
175 No verification is done. This is useful to extract the key id from
176 the header in order to acquire the appropriate certificate to verify
177 the token.
179 Args:
180 token (Union[str, bytes]): the encoded JWT.
182 Returns:
183 Mapping: The decoded JWT header.
184 """
185 header, _, _, _ = _unverified_decode(token)
186 return header
189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0):
190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token
191 payload.
193 Args:
194 payload (Mapping[str, str]): The JWT payload.
195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
196 validation.
198 Raises:
199 google.auth.exceptions.InvalidValue: if value validation failed.
200 google.auth.exceptions.MalformedError: if schema validation failed.
201 """
202 now = _helpers.datetime_to_secs(_helpers.utcnow())
204 # Make sure the iat and exp claims are present.
205 for key in ("iat", "exp"):
206 if key not in payload:
207 raise exceptions.MalformedError(
208 "Token does not contain required claim {}".format(key)
209 )
211 # Make sure the token wasn't issued in the future.
212 iat = payload["iat"]
213 # Err on the side of accepting a token that is slightly early to account
214 # for clock skew.
215 earliest = iat - clock_skew_in_seconds
216 if now < earliest:
217 raise exceptions.InvalidValue(
218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format(
219 now, iat
220 )
221 )
223 # Make sure the token wasn't issued in the past.
224 exp = payload["exp"]
225 # Err on the side of accepting a token that is slightly out of date
226 # to account for clow skew.
227 latest = exp + clock_skew_in_seconds
228 if latest < now:
229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now))
232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0):
233 """Decode and verify a JWT.
235 Args:
236 token (str): The encoded JWT.
237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The
238 certificate used to validate the JWT signature. If bytes or string,
239 it must the the public key certificate in PEM format. If a mapping,
240 it must be a mapping of key IDs to public key certificates in PEM
241 format. The mapping must contain the same key ID that's specified
242 in the token's header.
243 verify (bool): Whether to perform signature and claim validation.
244 Verification is done by default.
245 audience (str or list): The audience claim, 'aud', that this JWT should
246 contain. Or a list of audience claims. If None then the JWT's 'aud'
247 parameter is not verified.
248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
249 validation.
251 Returns:
252 Mapping[str, str]: The deserialized JSON payload in the JWT.
254 Raises:
255 google.auth.exceptions.InvalidValue: if value validation failed.
256 google.auth.exceptions.MalformedError: if schema validation failed.
257 """
258 header, payload, signed_section, signature = _unverified_decode(token)
260 if not verify:
261 return payload
263 # Pluck the key id and algorithm from the header and make sure we have
264 # a verifier that can support it.
265 key_alg = header.get("alg")
266 key_id = header.get("kid")
268 try:
269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
270 except KeyError as exc:
271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
272 six.raise_from(
273 exceptions.InvalidValue(
274 "The key algorithm {} requires the cryptography package "
275 "to be installed.".format(key_alg)
276 ),
277 exc,
278 )
279 else:
280 six.raise_from(
281 exceptions.InvalidValue(
282 "Unsupported signature algorithm {}".format(key_alg)
283 ),
284 exc,
285 )
287 # If certs is specified as a dictionary of key IDs to certificates, then
288 # use the certificate identified by the key ID in the token header.
289 if isinstance(certs, Mapping):
290 if key_id:
291 if key_id not in certs:
292 raise exceptions.MalformedError(
293 "Certificate for key id {} not found.".format(key_id)
294 )
295 certs_to_check = [certs[key_id]]
296 # If there's no key id in the header, check against all of the certs.
297 else:
298 certs_to_check = certs.values()
299 else:
300 certs_to_check = certs
302 # Verify that the signature matches the message.
303 if not crypt.verify_signature(
304 signed_section, signature, certs_to_check, verifier_cls
305 ):
306 raise exceptions.MalformedError("Could not verify token signature.")
308 # Verify the issued at and created times in the payload.
309 _verify_iat_and_exp(payload, clock_skew_in_seconds)
311 # Check audience.
312 if audience is not None:
313 claim_audience = payload.get("aud")
314 if isinstance(audience, str):
315 audience = [audience]
316 if claim_audience not in audience:
317 raise exceptions.InvalidValue(
318 "Token has wrong audience {}, expected one of {}".format(
319 claim_audience, audience
320 )
321 )
323 return payload
326class Credentials(
327 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
328):
329 """Credentials that use a JWT as the bearer token.
331 These credentials require an "audience" claim. This claim identifies the
332 intended recipient of the bearer token.
334 The constructor arguments determine the claims for the JWT that is
335 sent with requests. Usually, you'll construct these credentials with
336 one of the helper constructors as shown in the next section.
338 To create JWT credentials using a Google service account private key
339 JSON file::
341 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
342 credentials = jwt.Credentials.from_service_account_file(
343 'service-account.json',
344 audience=audience)
346 If you already have the service account file loaded and parsed::
348 service_account_info = json.load(open('service_account.json'))
349 credentials = jwt.Credentials.from_service_account_info(
350 service_account_info,
351 audience=audience)
353 Both helper methods pass on arguments to the constructor, so you can
354 specify the JWT claims::
356 credentials = jwt.Credentials.from_service_account_file(
357 'service-account.json',
358 audience=audience,
359 additional_claims={'meta': 'data'})
361 You can also construct the credentials directly if you have a
362 :class:`~google.auth.crypt.Signer` instance::
364 credentials = jwt.Credentials(
365 signer,
366 issuer='your-issuer',
367 subject='your-subject',
368 audience=audience)
370 The claims are considered immutable. If you want to modify the claims,
371 you can easily create another instance using :meth:`with_claims`::
373 new_audience = (
374 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber')
375 new_credentials = credentials.with_claims(audience=new_audience)
376 """
378 def __init__(
379 self,
380 signer,
381 issuer,
382 subject,
383 audience,
384 additional_claims=None,
385 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
386 quota_project_id=None,
387 ):
388 """
389 Args:
390 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
391 issuer (str): The `iss` claim.
392 subject (str): The `sub` claim.
393 audience (str): the `aud` claim. The intended audience for the
394 credentials.
395 additional_claims (Mapping[str, str]): Any additional claims for
396 the JWT payload.
397 token_lifetime (int): The amount of time in seconds for
398 which the token is valid. Defaults to 1 hour.
399 quota_project_id (Optional[str]): The project ID used for quota
400 and billing.
401 """
402 super(Credentials, self).__init__()
403 self._signer = signer
404 self._issuer = issuer
405 self._subject = subject
406 self._audience = audience
407 self._token_lifetime = token_lifetime
408 self._quota_project_id = quota_project_id
410 if additional_claims is None:
411 additional_claims = {}
413 self._additional_claims = additional_claims
415 @classmethod
416 def _from_signer_and_info(cls, signer, info, **kwargs):
417 """Creates a Credentials instance from a signer and service account
418 info.
420 Args:
421 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
422 info (Mapping[str, str]): The service account info.
423 kwargs: Additional arguments to pass to the constructor.
425 Returns:
426 google.auth.jwt.Credentials: The constructed credentials.
428 Raises:
429 google.auth.exceptions.MalformedError: If the info is not in the expected format.
430 """
431 kwargs.setdefault("subject", info["client_email"])
432 kwargs.setdefault("issuer", info["client_email"])
433 return cls(signer, **kwargs)
435 @classmethod
436 def from_service_account_info(cls, info, **kwargs):
437 """Creates an Credentials instance from a dictionary.
439 Args:
440 info (Mapping[str, str]): The service account info in Google
441 format.
442 kwargs: Additional arguments to pass to the constructor.
444 Returns:
445 google.auth.jwt.Credentials: The constructed credentials.
447 Raises:
448 google.auth.exceptions.MalformedError: If the info is not in the expected format.
449 """
450 signer = _service_account_info.from_dict(info, require=["client_email"])
451 return cls._from_signer_and_info(signer, info, **kwargs)
453 @classmethod
454 def from_service_account_file(cls, filename, **kwargs):
455 """Creates a Credentials instance from a service account .json file
456 in Google format.
458 Args:
459 filename (str): The path to the service account .json file.
460 kwargs: Additional arguments to pass to the constructor.
462 Returns:
463 google.auth.jwt.Credentials: The constructed credentials.
464 """
465 info, signer = _service_account_info.from_filename(
466 filename, require=["client_email"]
467 )
468 return cls._from_signer_and_info(signer, info, **kwargs)
470 @classmethod
471 def from_signing_credentials(cls, credentials, audience, **kwargs):
472 """Creates a new :class:`google.auth.jwt.Credentials` instance from an
473 existing :class:`google.auth.credentials.Signing` instance.
475 The new instance will use the same signer as the existing instance and
476 will use the existing instance's signer email as the issuer and
477 subject by default.
479 Example::
481 svc_creds = service_account.Credentials.from_service_account_file(
482 'service_account.json')
483 audience = (
484 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher')
485 jwt_creds = jwt.Credentials.from_signing_credentials(
486 svc_creds, audience=audience)
488 Args:
489 credentials (google.auth.credentials.Signing): The credentials to
490 use to construct the new credentials.
491 audience (str): the `aud` claim. The intended audience for the
492 credentials.
493 kwargs: Additional arguments to pass to the constructor.
495 Returns:
496 google.auth.jwt.Credentials: A new Credentials instance.
497 """
498 kwargs.setdefault("issuer", credentials.signer_email)
499 kwargs.setdefault("subject", credentials.signer_email)
500 return cls(credentials.signer, audience=audience, **kwargs)
502 def with_claims(
503 self, issuer=None, subject=None, audience=None, additional_claims=None
504 ):
505 """Returns a copy of these credentials with modified claims.
507 Args:
508 issuer (str): The `iss` claim. If unspecified the current issuer
509 claim will be used.
510 subject (str): The `sub` claim. If unspecified the current subject
511 claim will be used.
512 audience (str): the `aud` claim. If unspecified the current
513 audience claim will be used.
514 additional_claims (Mapping[str, str]): Any additional claims for
515 the JWT payload. This will be merged with the current
516 additional claims.
518 Returns:
519 google.auth.jwt.Credentials: A new credentials instance.
520 """
521 new_additional_claims = copy.deepcopy(self._additional_claims)
522 new_additional_claims.update(additional_claims or {})
524 return self.__class__(
525 self._signer,
526 issuer=issuer if issuer is not None else self._issuer,
527 subject=subject if subject is not None else self._subject,
528 audience=audience if audience is not None else self._audience,
529 additional_claims=new_additional_claims,
530 quota_project_id=self._quota_project_id,
531 )
533 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
534 def with_quota_project(self, quota_project_id):
535 return self.__class__(
536 self._signer,
537 issuer=self._issuer,
538 subject=self._subject,
539 audience=self._audience,
540 additional_claims=self._additional_claims,
541 quota_project_id=quota_project_id,
542 )
544 def _make_jwt(self):
545 """Make a signed JWT.
547 Returns:
548 Tuple[bytes, datetime]: The encoded JWT and the expiration.
549 """
550 now = _helpers.utcnow()
551 lifetime = datetime.timedelta(seconds=self._token_lifetime)
552 expiry = now + lifetime
554 payload = {
555 "iss": self._issuer,
556 "sub": self._subject,
557 "iat": _helpers.datetime_to_secs(now),
558 "exp": _helpers.datetime_to_secs(expiry),
559 }
560 if self._audience:
561 payload["aud"] = self._audience
563 payload.update(self._additional_claims)
565 jwt = encode(self._signer, payload)
567 return jwt, expiry
569 def refresh(self, request):
570 """Refreshes the access token.
572 Args:
573 request (Any): Unused.
574 """
575 # pylint: disable=unused-argument
576 # (pylint doesn't correctly recognize overridden methods.)
577 self.token, self.expiry = self._make_jwt()
579 @_helpers.copy_docstring(google.auth.credentials.Signing)
580 def sign_bytes(self, message):
581 return self._signer.sign(message)
583 @property # type: ignore
584 @_helpers.copy_docstring(google.auth.credentials.Signing)
585 def signer_email(self):
586 return self._issuer
588 @property # type: ignore
589 @_helpers.copy_docstring(google.auth.credentials.Signing)
590 def signer(self):
591 return self._signer
594class OnDemandCredentials(
595 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
596):
597 """On-demand JWT credentials.
599 Like :class:`Credentials`, this class uses a JWT as the bearer token for
600 authentication. However, this class does not require the audience at
601 construction time. Instead, it will generate a new token on-demand for
602 each request using the request URI as the audience. It caches tokens
603 so that multiple requests to the same URI do not incur the overhead
604 of generating a new token every time.
606 This behavior is especially useful for `gRPC`_ clients. A gRPC service may
607 have multiple audience and gRPC clients may not know all of the audiences
608 required for accessing a particular service. With these credentials,
609 no knowledge of the audiences is required ahead of time.
611 .. _grpc: http://www.grpc.io/
612 """
614 def __init__(
615 self,
616 signer,
617 issuer,
618 subject,
619 additional_claims=None,
620 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
621 max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
622 quota_project_id=None,
623 ):
624 """
625 Args:
626 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
627 issuer (str): The `iss` claim.
628 subject (str): The `sub` claim.
629 additional_claims (Mapping[str, str]): Any additional claims for
630 the JWT payload.
631 token_lifetime (int): The amount of time in seconds for
632 which the token is valid. Defaults to 1 hour.
633 max_cache_size (int): The maximum number of JWT tokens to keep in
634 cache. Tokens are cached using :class:`cachetools.LRUCache`.
635 quota_project_id (Optional[str]): The project ID used for quota
636 and billing.
638 """
639 super(OnDemandCredentials, self).__init__()
640 self._signer = signer
641 self._issuer = issuer
642 self._subject = subject
643 self._token_lifetime = token_lifetime
644 self._quota_project_id = quota_project_id
646 if additional_claims is None:
647 additional_claims = {}
649 self._additional_claims = additional_claims
650 self._cache = cachetools.LRUCache(maxsize=max_cache_size)
652 @classmethod
653 def _from_signer_and_info(cls, signer, info, **kwargs):
654 """Creates an OnDemandCredentials instance from a signer and service
655 account info.
657 Args:
658 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
659 info (Mapping[str, str]): The service account info.
660 kwargs: Additional arguments to pass to the constructor.
662 Returns:
663 google.auth.jwt.OnDemandCredentials: The constructed credentials.
665 Raises:
666 google.auth.exceptions.MalformedError: If the info is not in the expected format.
667 """
668 kwargs.setdefault("subject", info["client_email"])
669 kwargs.setdefault("issuer", info["client_email"])
670 return cls(signer, **kwargs)
672 @classmethod
673 def from_service_account_info(cls, info, **kwargs):
674 """Creates an OnDemandCredentials instance from a dictionary.
676 Args:
677 info (Mapping[str, str]): The service account info in Google
678 format.
679 kwargs: Additional arguments to pass to the constructor.
681 Returns:
682 google.auth.jwt.OnDemandCredentials: The constructed credentials.
684 Raises:
685 google.auth.exceptions.MalformedError: If the info is not in the expected format.
686 """
687 signer = _service_account_info.from_dict(info, require=["client_email"])
688 return cls._from_signer_and_info(signer, info, **kwargs)
690 @classmethod
691 def from_service_account_file(cls, filename, **kwargs):
692 """Creates an OnDemandCredentials instance from a service account .json
693 file in Google format.
695 Args:
696 filename (str): The path to the service account .json file.
697 kwargs: Additional arguments to pass to the constructor.
699 Returns:
700 google.auth.jwt.OnDemandCredentials: The constructed credentials.
701 """
702 info, signer = _service_account_info.from_filename(
703 filename, require=["client_email"]
704 )
705 return cls._from_signer_and_info(signer, info, **kwargs)
707 @classmethod
708 def from_signing_credentials(cls, credentials, **kwargs):
709 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance
710 from an existing :class:`google.auth.credentials.Signing` instance.
712 The new instance will use the same signer as the existing instance and
713 will use the existing instance's signer email as the issuer and
714 subject by default.
716 Example::
718 svc_creds = service_account.Credentials.from_service_account_file(
719 'service_account.json')
720 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials(
721 svc_creds)
723 Args:
724 credentials (google.auth.credentials.Signing): The credentials to
725 use to construct the new credentials.
726 kwargs: Additional arguments to pass to the constructor.
728 Returns:
729 google.auth.jwt.Credentials: A new Credentials instance.
730 """
731 kwargs.setdefault("issuer", credentials.signer_email)
732 kwargs.setdefault("subject", credentials.signer_email)
733 return cls(credentials.signer, **kwargs)
735 def with_claims(self, issuer=None, subject=None, additional_claims=None):
736 """Returns a copy of these credentials with modified claims.
738 Args:
739 issuer (str): The `iss` claim. If unspecified the current issuer
740 claim will be used.
741 subject (str): The `sub` claim. If unspecified the current subject
742 claim will be used.
743 additional_claims (Mapping[str, str]): Any additional claims for
744 the JWT payload. This will be merged with the current
745 additional claims.
747 Returns:
748 google.auth.jwt.OnDemandCredentials: A new credentials instance.
749 """
750 new_additional_claims = copy.deepcopy(self._additional_claims)
751 new_additional_claims.update(additional_claims or {})
753 return self.__class__(
754 self._signer,
755 issuer=issuer if issuer is not None else self._issuer,
756 subject=subject if subject is not None else self._subject,
757 additional_claims=new_additional_claims,
758 max_cache_size=self._cache.maxsize,
759 quota_project_id=self._quota_project_id,
760 )
762 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
763 def with_quota_project(self, quota_project_id):
765 return self.__class__(
766 self._signer,
767 issuer=self._issuer,
768 subject=self._subject,
769 additional_claims=self._additional_claims,
770 max_cache_size=self._cache.maxsize,
771 quota_project_id=quota_project_id,
772 )
774 @property
775 def valid(self):
776 """Checks the validity of the credentials.
778 These credentials are always valid because it generates tokens on
779 demand.
780 """
781 return True
783 def _make_jwt_for_audience(self, audience):
784 """Make a new JWT for the given audience.
786 Args:
787 audience (str): The intended audience.
789 Returns:
790 Tuple[bytes, datetime]: The encoded JWT and the expiration.
791 """
792 now = _helpers.utcnow()
793 lifetime = datetime.timedelta(seconds=self._token_lifetime)
794 expiry = now + lifetime
796 payload = {
797 "iss": self._issuer,
798 "sub": self._subject,
799 "iat": _helpers.datetime_to_secs(now),
800 "exp": _helpers.datetime_to_secs(expiry),
801 "aud": audience,
802 }
804 payload.update(self._additional_claims)
806 jwt = encode(self._signer, payload)
808 return jwt, expiry
810 def _get_jwt_for_audience(self, audience):
811 """Get a JWT For a given audience.
813 If there is already an existing, non-expired token in the cache for
814 the audience, that token is used. Otherwise, a new token will be
815 created.
817 Args:
818 audience (str): The intended audience.
820 Returns:
821 bytes: The encoded JWT.
822 """
823 token, expiry = self._cache.get(audience, (None, None))
825 if token is None or expiry < _helpers.utcnow():
826 token, expiry = self._make_jwt_for_audience(audience)
827 self._cache[audience] = token, expiry
829 return token
831 def refresh(self, request):
832 """Raises an exception, these credentials can not be directly
833 refreshed.
835 Args:
836 request (Any): Unused.
838 Raises:
839 google.auth.RefreshError
840 """
841 # pylint: disable=unused-argument
842 # (pylint doesn't correctly recognize overridden methods.)
843 raise exceptions.RefreshError(
844 "OnDemandCredentials can not be directly refreshed."
845 )
847 def before_request(self, request, method, url, headers):
848 """Performs credential-specific before request logic.
850 Args:
851 request (Any): Unused. JWT credentials do not need to make an
852 HTTP request to refresh.
853 method (str): The request's HTTP method.
854 url (str): The request's URI. This is used as the audience claim
855 when generating the JWT.
856 headers (Mapping): The request's headers.
857 """
858 # pylint: disable=unused-argument
859 # (pylint doesn't correctly recognize overridden methods.)
860 parts = urllib.parse.urlsplit(url)
861 # Strip query string and fragment
862 audience = urllib.parse.urlunsplit(
863 (parts.scheme, parts.netloc, parts.path, "", "")
864 )
865 token = self._get_jwt_for_audience(audience)
866 self.apply(headers, token=token)
868 @_helpers.copy_docstring(google.auth.credentials.Signing)
869 def sign_bytes(self, message):
870 return self._signer.sign(message)
872 @property # type: ignore
873 @_helpers.copy_docstring(google.auth.credentials.Signing)
874 def signer_email(self):
875 return self._issuer
877 @property # type: ignore
878 @_helpers.copy_docstring(google.auth.credentials.Signing)
879 def signer(self):
880 return self._signer