Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/jwt.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""JSON Web Tokens
17Provides support for creating (encoding) and verifying (decoding) JWTs,
18especially JWTs generated and consumed by Google infrastructure.
20See `rfc7519`_ for more details on JWTs.
22To encode a JWT use :func:`encode`::
24 from google.auth import crypt
25 from google.auth import jwt
27 signer = crypt.Signer(private_key)
28 payload = {'some': 'payload'}
29 encoded = jwt.encode(signer, payload)
31To decode a JWT and verify claims use :func:`decode`::
33 claims = jwt.decode(encoded, certs=public_certs)
35You can also skip verification::
37 claims = jwt.decode(encoded, verify=False)
39.. _rfc7519: https://tools.ietf.org/html/rfc7519
41"""
43try:
44 from collections.abc import Mapping
45# Python 2.7 compatibility
46except ImportError: # pragma: NO COVER
47 from collections import Mapping # type: ignore
48import copy
49import datetime
50import json
51import urllib
53import cachetools
55from google.auth import _helpers
56from google.auth import _service_account_info
57from google.auth import crypt
58from google.auth import exceptions
59import google.auth.credentials
61try:
62 from google.auth.crypt import es
63except ImportError: # pragma: NO COVER
64 es = None # type: ignore
66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
67_DEFAULT_MAX_CACHE_SIZE = 10
68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256", "ES384"])
71if es is not None: # pragma: NO COVER
72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es.EsVerifier # type: ignore
73 _ALGORITHM_TO_VERIFIER_CLASS["ES384"] = es.EsVerifier # type: ignore
76def encode(signer, payload, header=None, key_id=None):
77 """Make a signed JWT.
79 Args:
80 signer (google.auth.crypt.Signer): The signer used to sign the JWT.
81 payload (Mapping[str, str]): The JWT payload.
82 header (Mapping[str, str]): Additional JWT header payload.
83 key_id (str): The key id to add to the JWT header. If the
84 signer has a key id it will be used as the default. If this is
85 specified it will override the signer's key id.
87 Returns:
88 bytes: The encoded JWT.
89 """
90 if header is None:
91 header = {}
93 if key_id is None:
94 key_id = signer.key_id
96 header.update({"typ": "JWT"})
98 if "alg" not in header:
99 if es is not None and isinstance(signer, es.EsSigner):
100 header.update({"alg": signer.algorithm})
101 else:
102 header.update({"alg": "RS256"})
104 if key_id is not None:
105 header["kid"] = key_id
107 segments = [
108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
110 ]
112 signing_input = b".".join(segments)
113 signature = signer.sign(signing_input)
114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
116 return b".".join(segments)
119def _decode_jwt_segment(encoded_section):
120 """Decodes a single JWT segment."""
121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
122 try:
123 return json.loads(section_bytes.decode("utf-8"))
124 except ValueError as caught_exc:
125 new_exc = exceptions.MalformedError(
126 "Can't parse segment: {0}".format(section_bytes)
127 )
128 raise new_exc from caught_exc
131def _unverified_decode(token):
132 """Decodes a token and does no verification.
134 Args:
135 token (Union[str, bytes]): The encoded JWT.
137 Returns:
138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and
139 signature.
141 Raises:
142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type.
143 """
144 token = _helpers.to_bytes(token)
146 if token.count(b".") != 2:
147 raise exceptions.MalformedError(
148 "Wrong number of segments in token: {0}".format(token)
149 )
151 encoded_header, encoded_payload, signature = token.split(b".")
152 signed_section = encoded_header + b"." + encoded_payload
153 signature = _helpers.padded_urlsafe_b64decode(signature)
155 # Parse segments
156 header = _decode_jwt_segment(encoded_header)
157 payload = _decode_jwt_segment(encoded_payload)
159 if not isinstance(header, Mapping):
160 raise exceptions.MalformedError(
161 "Header segment should be a JSON object: {0}".format(encoded_header)
162 )
164 if not isinstance(payload, Mapping):
165 raise exceptions.MalformedError(
166 "Payload segment should be a JSON object: {0}".format(encoded_payload)
167 )
169 return header, payload, signed_section, signature
172def decode_header(token):
173 """Return the decoded header of a token.
175 No verification is done. This is useful to extract the key id from
176 the header in order to acquire the appropriate certificate to verify
177 the token.
179 Args:
180 token (Union[str, bytes]): the encoded JWT.
182 Returns:
183 Mapping: The decoded JWT header.
184 """
185 header, _, _, _ = _unverified_decode(token)
186 return header
189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0):
190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token
191 payload.
193 Args:
194 payload (Mapping[str, str]): The JWT payload.
195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
196 validation.
198 Raises:
199 google.auth.exceptions.InvalidValue: if value validation failed.
200 google.auth.exceptions.MalformedError: if schema validation failed.
201 """
202 now = _helpers.datetime_to_secs(_helpers.utcnow())
204 # Make sure the iat and exp claims are present.
205 for key in ("iat", "exp"):
206 if key not in payload:
207 raise exceptions.MalformedError(
208 "Token does not contain required claim {}".format(key)
209 )
211 # Make sure the token wasn't issued in the future.
212 iat = payload["iat"]
213 # Err on the side of accepting a token that is slightly early to account
214 # for clock skew.
215 earliest = iat - clock_skew_in_seconds
216 if now < earliest:
217 raise exceptions.InvalidValue(
218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format(
219 now, iat
220 )
221 )
223 # Make sure the token wasn't issued in the past.
224 exp = payload["exp"]
225 # Err on the side of accepting a token that is slightly out of date
226 # to account for clow skew.
227 latest = exp + clock_skew_in_seconds
228 if latest < now:
229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now))
232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0):
233 """Decode and verify a JWT.
235 Args:
236 token (str): The encoded JWT.
237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The
238 certificate used to validate the JWT signature. If bytes or string,
239 it must the the public key certificate in PEM format. If a mapping,
240 it must be a mapping of key IDs to public key certificates in PEM
241 format. The mapping must contain the same key ID that's specified
242 in the token's header.
243 verify (bool): Whether to perform signature and claim validation.
244 Verification is done by default.
245 audience (str or list): The audience claim, 'aud', that this JWT should
246 contain. Or a list of audience claims. If None then the JWT's 'aud'
247 parameter is not verified.
248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
249 validation.
251 Returns:
252 Mapping[str, str]: The deserialized JSON payload in the JWT.
254 Raises:
255 google.auth.exceptions.InvalidValue: if value validation failed.
256 google.auth.exceptions.MalformedError: if schema validation failed.
257 """
258 header, payload, signed_section, signature = _unverified_decode(token)
260 if not verify:
261 return payload
263 # Pluck the key id and algorithm from the header and make sure we have
264 # a verifier that can support it.
265 key_alg = header.get("alg")
266 key_id = header.get("kid")
268 try:
269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
270 except KeyError as exc:
271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
272 raise exceptions.InvalidValue(
273 "The key algorithm {} requires the cryptography package to be installed.".format(
274 key_alg
275 )
276 ) from exc
277 else:
278 raise exceptions.InvalidValue(
279 "Unsupported signature algorithm {}".format(key_alg)
280 ) from exc
281 # If certs is specified as a dictionary of key IDs to certificates, then
282 # use the certificate identified by the key ID in the token header.
283 if isinstance(certs, Mapping):
284 if key_id:
285 if key_id not in certs:
286 raise exceptions.MalformedError(
287 "Certificate for key id {} not found.".format(key_id)
288 )
289 certs_to_check = [certs[key_id]]
290 # If there's no key id in the header, check against all of the certs.
291 else:
292 certs_to_check = certs.values()
293 else:
294 certs_to_check = certs
296 # Verify that the signature matches the message.
297 if not crypt.verify_signature(
298 signed_section, signature, certs_to_check, verifier_cls
299 ):
300 raise exceptions.MalformedError("Could not verify token signature.")
302 # Verify the issued at and created times in the payload.
303 _verify_iat_and_exp(payload, clock_skew_in_seconds)
305 # Check audience.
306 if audience is not None:
307 claim_audience = payload.get("aud")
308 if isinstance(audience, str):
309 audience = [audience]
310 if claim_audience not in audience:
311 raise exceptions.InvalidValue(
312 "Token has wrong audience {}, expected one of {}".format(
313 claim_audience, audience
314 )
315 )
317 return payload
320class Credentials(
321 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
322):
323 """Credentials that use a JWT as the bearer token.
325 These credentials require an "audience" claim. This claim identifies the
326 intended recipient of the bearer token.
328 The constructor arguments determine the claims for the JWT that is
329 sent with requests. Usually, you'll construct these credentials with
330 one of the helper constructors as shown in the next section.
332 To create JWT credentials using a Google service account private key
333 JSON file::
335 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
336 credentials = jwt.Credentials.from_service_account_file(
337 'service-account.json',
338 audience=audience)
340 If you already have the service account file loaded and parsed::
342 service_account_info = json.load(open('service_account.json'))
343 credentials = jwt.Credentials.from_service_account_info(
344 service_account_info,
345 audience=audience)
347 Both helper methods pass on arguments to the constructor, so you can
348 specify the JWT claims::
350 credentials = jwt.Credentials.from_service_account_file(
351 'service-account.json',
352 audience=audience,
353 additional_claims={'meta': 'data'})
355 You can also construct the credentials directly if you have a
356 :class:`~google.auth.crypt.Signer` instance::
358 credentials = jwt.Credentials(
359 signer,
360 issuer='your-issuer',
361 subject='your-subject',
362 audience=audience)
364 The claims are considered immutable. If you want to modify the claims,
365 you can easily create another instance using :meth:`with_claims`::
367 new_audience = (
368 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber')
369 new_credentials = credentials.with_claims(audience=new_audience)
370 """
372 def __init__(
373 self,
374 signer,
375 issuer,
376 subject,
377 audience,
378 additional_claims=None,
379 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
380 quota_project_id=None,
381 ):
382 """
383 Args:
384 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
385 issuer (str): The `iss` claim.
386 subject (str): The `sub` claim.
387 audience (str): the `aud` claim. The intended audience for the
388 credentials.
389 additional_claims (Mapping[str, str]): Any additional claims for
390 the JWT payload.
391 token_lifetime (int): The amount of time in seconds for
392 which the token is valid. Defaults to 1 hour.
393 quota_project_id (Optional[str]): The project ID used for quota
394 and billing.
395 """
396 super(Credentials, self).__init__()
397 self._signer = signer
398 self._issuer = issuer
399 self._subject = subject
400 self._audience = audience
401 self._token_lifetime = token_lifetime
402 self._quota_project_id = quota_project_id
404 if additional_claims is None:
405 additional_claims = {}
407 self._additional_claims = additional_claims
409 @classmethod
410 def _from_signer_and_info(cls, signer, info, **kwargs):
411 """Creates a Credentials instance from a signer and service account
412 info.
414 Args:
415 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
416 info (Mapping[str, str]): The service account info.
417 kwargs: Additional arguments to pass to the constructor.
419 Returns:
420 google.auth.jwt.Credentials: The constructed credentials.
422 Raises:
423 google.auth.exceptions.MalformedError: If the info is not in the expected format.
424 """
425 kwargs.setdefault("subject", info["client_email"])
426 kwargs.setdefault("issuer", info["client_email"])
427 return cls(signer, **kwargs)
429 @classmethod
430 def from_service_account_info(cls, info, **kwargs):
431 """Creates an Credentials instance from a dictionary.
433 Args:
434 info (Mapping[str, str]): The service account info in Google
435 format.
436 kwargs: Additional arguments to pass to the constructor.
438 Returns:
439 google.auth.jwt.Credentials: The constructed credentials.
441 Raises:
442 google.auth.exceptions.MalformedError: If the info is not in the expected format.
443 """
444 signer = _service_account_info.from_dict(info, require=["client_email"])
445 return cls._from_signer_and_info(signer, info, **kwargs)
447 @classmethod
448 def from_service_account_file(cls, filename, **kwargs):
449 """Creates a Credentials instance from a service account .json file
450 in Google format.
452 Args:
453 filename (str): The path to the service account .json file.
454 kwargs: Additional arguments to pass to the constructor.
456 Returns:
457 google.auth.jwt.Credentials: The constructed credentials.
458 """
459 info, signer = _service_account_info.from_filename(
460 filename, require=["client_email"]
461 )
462 return cls._from_signer_and_info(signer, info, **kwargs)
464 @classmethod
465 def from_signing_credentials(cls, credentials, audience, **kwargs):
466 """Creates a new :class:`google.auth.jwt.Credentials` instance from an
467 existing :class:`google.auth.credentials.Signing` instance.
469 The new instance will use the same signer as the existing instance and
470 will use the existing instance's signer email as the issuer and
471 subject by default.
473 Example::
475 svc_creds = service_account.Credentials.from_service_account_file(
476 'service_account.json')
477 audience = (
478 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher')
479 jwt_creds = jwt.Credentials.from_signing_credentials(
480 svc_creds, audience=audience)
482 Args:
483 credentials (google.auth.credentials.Signing): The credentials to
484 use to construct the new credentials.
485 audience (str): the `aud` claim. The intended audience for the
486 credentials.
487 kwargs: Additional arguments to pass to the constructor.
489 Returns:
490 google.auth.jwt.Credentials: A new Credentials instance.
491 """
492 kwargs.setdefault("issuer", credentials.signer_email)
493 kwargs.setdefault("subject", credentials.signer_email)
494 return cls(credentials.signer, audience=audience, **kwargs)
496 def with_claims(
497 self, issuer=None, subject=None, audience=None, additional_claims=None
498 ):
499 """Returns a copy of these credentials with modified claims.
501 Args:
502 issuer (str): The `iss` claim. If unspecified the current issuer
503 claim will be used.
504 subject (str): The `sub` claim. If unspecified the current subject
505 claim will be used.
506 audience (str): the `aud` claim. If unspecified the current
507 audience claim will be used.
508 additional_claims (Mapping[str, str]): Any additional claims for
509 the JWT payload. This will be merged with the current
510 additional claims.
512 Returns:
513 google.auth.jwt.Credentials: A new credentials instance.
514 """
515 new_additional_claims = copy.deepcopy(self._additional_claims)
516 new_additional_claims.update(additional_claims or {})
518 return self.__class__(
519 self._signer,
520 issuer=issuer if issuer is not None else self._issuer,
521 subject=subject if subject is not None else self._subject,
522 audience=audience if audience is not None else self._audience,
523 additional_claims=new_additional_claims,
524 quota_project_id=self._quota_project_id,
525 )
527 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
528 def with_quota_project(self, quota_project_id):
529 return self.__class__(
530 self._signer,
531 issuer=self._issuer,
532 subject=self._subject,
533 audience=self._audience,
534 additional_claims=self._additional_claims,
535 quota_project_id=quota_project_id,
536 )
538 def _make_jwt(self):
539 """Make a signed JWT.
541 Returns:
542 Tuple[bytes, datetime]: The encoded JWT and the expiration.
543 """
544 now = _helpers.utcnow()
545 lifetime = datetime.timedelta(seconds=self._token_lifetime)
546 expiry = now + lifetime
548 payload = {
549 "iss": self._issuer,
550 "sub": self._subject,
551 "iat": _helpers.datetime_to_secs(now),
552 "exp": _helpers.datetime_to_secs(expiry),
553 }
554 if self._audience:
555 payload["aud"] = self._audience
557 payload.update(self._additional_claims)
559 jwt = encode(self._signer, payload)
561 return jwt, expiry
563 def refresh(self, request):
564 """Refreshes the access token.
566 Args:
567 request (Any): Unused.
568 """
569 # pylint: disable=unused-argument
570 # (pylint doesn't correctly recognize overridden methods.)
571 self.token, self.expiry = self._make_jwt()
573 @_helpers.copy_docstring(google.auth.credentials.Signing)
574 def sign_bytes(self, message):
575 return self._signer.sign(message)
577 @property # type: ignore
578 @_helpers.copy_docstring(google.auth.credentials.Signing)
579 def signer_email(self):
580 return self._issuer
582 @property # type: ignore
583 @_helpers.copy_docstring(google.auth.credentials.Signing)
584 def signer(self):
585 return self._signer
587 @property # type: ignore
588 def additional_claims(self):
589 """ Additional claims the JWT object was created with."""
590 return self._additional_claims
593class OnDemandCredentials(
594 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
595):
596 """On-demand JWT credentials.
598 Like :class:`Credentials`, this class uses a JWT as the bearer token for
599 authentication. However, this class does not require the audience at
600 construction time. Instead, it will generate a new token on-demand for
601 each request using the request URI as the audience. It caches tokens
602 so that multiple requests to the same URI do not incur the overhead
603 of generating a new token every time.
605 This behavior is especially useful for `gRPC`_ clients. A gRPC service may
606 have multiple audience and gRPC clients may not know all of the audiences
607 required for accessing a particular service. With these credentials,
608 no knowledge of the audiences is required ahead of time.
610 .. _grpc: http://www.grpc.io/
611 """
613 def __init__(
614 self,
615 signer,
616 issuer,
617 subject,
618 additional_claims=None,
619 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
620 max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
621 quota_project_id=None,
622 ):
623 """
624 Args:
625 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
626 issuer (str): The `iss` claim.
627 subject (str): The `sub` claim.
628 additional_claims (Mapping[str, str]): Any additional claims for
629 the JWT payload.
630 token_lifetime (int): The amount of time in seconds for
631 which the token is valid. Defaults to 1 hour.
632 max_cache_size (int): The maximum number of JWT tokens to keep in
633 cache. Tokens are cached using :class:`cachetools.LRUCache`.
634 quota_project_id (Optional[str]): The project ID used for quota
635 and billing.
637 """
638 super(OnDemandCredentials, self).__init__()
639 self._signer = signer
640 self._issuer = issuer
641 self._subject = subject
642 self._token_lifetime = token_lifetime
643 self._quota_project_id = quota_project_id
645 if additional_claims is None:
646 additional_claims = {}
648 self._additional_claims = additional_claims
649 self._cache = cachetools.LRUCache(maxsize=max_cache_size)
651 @classmethod
652 def _from_signer_and_info(cls, signer, info, **kwargs):
653 """Creates an OnDemandCredentials instance from a signer and service
654 account info.
656 Args:
657 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
658 info (Mapping[str, str]): The service account info.
659 kwargs: Additional arguments to pass to the constructor.
661 Returns:
662 google.auth.jwt.OnDemandCredentials: The constructed credentials.
664 Raises:
665 google.auth.exceptions.MalformedError: If the info is not in the expected format.
666 """
667 kwargs.setdefault("subject", info["client_email"])
668 kwargs.setdefault("issuer", info["client_email"])
669 return cls(signer, **kwargs)
671 @classmethod
672 def from_service_account_info(cls, info, **kwargs):
673 """Creates an OnDemandCredentials instance from a dictionary.
675 Args:
676 info (Mapping[str, str]): The service account info in Google
677 format.
678 kwargs: Additional arguments to pass to the constructor.
680 Returns:
681 google.auth.jwt.OnDemandCredentials: The constructed credentials.
683 Raises:
684 google.auth.exceptions.MalformedError: If the info is not in the expected format.
685 """
686 signer = _service_account_info.from_dict(info, require=["client_email"])
687 return cls._from_signer_and_info(signer, info, **kwargs)
689 @classmethod
690 def from_service_account_file(cls, filename, **kwargs):
691 """Creates an OnDemandCredentials instance from a service account .json
692 file in Google format.
694 Args:
695 filename (str): The path to the service account .json file.
696 kwargs: Additional arguments to pass to the constructor.
698 Returns:
699 google.auth.jwt.OnDemandCredentials: The constructed credentials.
700 """
701 info, signer = _service_account_info.from_filename(
702 filename, require=["client_email"]
703 )
704 return cls._from_signer_and_info(signer, info, **kwargs)
706 @classmethod
707 def from_signing_credentials(cls, credentials, **kwargs):
708 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance
709 from an existing :class:`google.auth.credentials.Signing` instance.
711 The new instance will use the same signer as the existing instance and
712 will use the existing instance's signer email as the issuer and
713 subject by default.
715 Example::
717 svc_creds = service_account.Credentials.from_service_account_file(
718 'service_account.json')
719 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials(
720 svc_creds)
722 Args:
723 credentials (google.auth.credentials.Signing): The credentials to
724 use to construct the new credentials.
725 kwargs: Additional arguments to pass to the constructor.
727 Returns:
728 google.auth.jwt.Credentials: A new Credentials instance.
729 """
730 kwargs.setdefault("issuer", credentials.signer_email)
731 kwargs.setdefault("subject", credentials.signer_email)
732 return cls(credentials.signer, **kwargs)
734 def with_claims(self, issuer=None, subject=None, additional_claims=None):
735 """Returns a copy of these credentials with modified claims.
737 Args:
738 issuer (str): The `iss` claim. If unspecified the current issuer
739 claim will be used.
740 subject (str): The `sub` claim. If unspecified the current subject
741 claim will be used.
742 additional_claims (Mapping[str, str]): Any additional claims for
743 the JWT payload. This will be merged with the current
744 additional claims.
746 Returns:
747 google.auth.jwt.OnDemandCredentials: A new credentials instance.
748 """
749 new_additional_claims = copy.deepcopy(self._additional_claims)
750 new_additional_claims.update(additional_claims or {})
752 return self.__class__(
753 self._signer,
754 issuer=issuer if issuer is not None else self._issuer,
755 subject=subject if subject is not None else self._subject,
756 additional_claims=new_additional_claims,
757 max_cache_size=self._cache.maxsize,
758 quota_project_id=self._quota_project_id,
759 )
761 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
762 def with_quota_project(self, quota_project_id):
764 return self.__class__(
765 self._signer,
766 issuer=self._issuer,
767 subject=self._subject,
768 additional_claims=self._additional_claims,
769 max_cache_size=self._cache.maxsize,
770 quota_project_id=quota_project_id,
771 )
773 @property
774 def valid(self):
775 """Checks the validity of the credentials.
777 These credentials are always valid because it generates tokens on
778 demand.
779 """
780 return True
782 def _make_jwt_for_audience(self, audience):
783 """Make a new JWT for the given audience.
785 Args:
786 audience (str): The intended audience.
788 Returns:
789 Tuple[bytes, datetime]: The encoded JWT and the expiration.
790 """
791 now = _helpers.utcnow()
792 lifetime = datetime.timedelta(seconds=self._token_lifetime)
793 expiry = now + lifetime
795 payload = {
796 "iss": self._issuer,
797 "sub": self._subject,
798 "iat": _helpers.datetime_to_secs(now),
799 "exp": _helpers.datetime_to_secs(expiry),
800 "aud": audience,
801 }
803 payload.update(self._additional_claims)
805 jwt = encode(self._signer, payload)
807 return jwt, expiry
809 def _get_jwt_for_audience(self, audience):
810 """Get a JWT For a given audience.
812 If there is already an existing, non-expired token in the cache for
813 the audience, that token is used. Otherwise, a new token will be
814 created.
816 Args:
817 audience (str): The intended audience.
819 Returns:
820 bytes: The encoded JWT.
821 """
822 token, expiry = self._cache.get(audience, (None, None))
824 if token is None or expiry < _helpers.utcnow():
825 token, expiry = self._make_jwt_for_audience(audience)
826 self._cache[audience] = token, expiry
828 return token
830 def refresh(self, request):
831 """Raises an exception, these credentials can not be directly
832 refreshed.
834 Args:
835 request (Any): Unused.
837 Raises:
838 google.auth.RefreshError
839 """
840 # pylint: disable=unused-argument
841 # (pylint doesn't correctly recognize overridden methods.)
842 raise exceptions.RefreshError(
843 "OnDemandCredentials can not be directly refreshed."
844 )
846 def before_request(self, request, method, url, headers):
847 """Performs credential-specific before request logic.
849 Args:
850 request (Any): Unused. JWT credentials do not need to make an
851 HTTP request to refresh.
852 method (str): The request's HTTP method.
853 url (str): The request's URI. This is used as the audience claim
854 when generating the JWT.
855 headers (Mapping): The request's headers.
856 """
857 # pylint: disable=unused-argument
858 # (pylint doesn't correctly recognize overridden methods.)
859 parts = urllib.parse.urlsplit(url)
860 # Strip query string and fragment
861 audience = urllib.parse.urlunsplit(
862 (parts.scheme, parts.netloc, parts.path, "", "")
863 )
864 token = self._get_jwt_for_audience(audience)
865 self.apply(headers, token=token)
867 @_helpers.copy_docstring(google.auth.credentials.Signing)
868 def sign_bytes(self, message):
869 return self._signer.sign(message)
871 @property # type: ignore
872 @_helpers.copy_docstring(google.auth.credentials.Signing)
873 def signer_email(self):
874 return self._issuer
876 @property # type: ignore
877 @_helpers.copy_docstring(google.auth.credentials.Signing)
878 def signer(self):
879 return self._signer