Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 33%
238 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""JSON Web Tokens
17Provides support for creating (encoding) and verifying (decoding) JWTs,
18especially JWTs generated and consumed by Google infrastructure.
20See `rfc7519`_ for more details on JWTs.
22To encode a JWT use :func:`encode`::
24 from google.auth import crypt
25 from google.auth import jwt
27 signer = crypt.Signer(private_key)
28 payload = {'some': 'payload'}
29 encoded = jwt.encode(signer, payload)
31To decode a JWT and verify claims use :func:`decode`::
33 claims = jwt.decode(encoded, certs=public_certs)
35You can also skip verification::
37 claims = jwt.decode(encoded, verify=False)
39.. _rfc7519: https://tools.ietf.org/html/rfc7519
41"""
43try:
44 from collections.abc import Mapping
45# Python 2.7 compatibility
46except ImportError: # pragma: NO COVER
47 from collections import Mapping # type: ignore
48import copy
49import datetime
50import json
52import cachetools
53import six
54from six.moves import urllib
56from google.auth import _helpers
57from google.auth import _service_account_info
58from google.auth import crypt
59from google.auth import exceptions
60import google.auth.credentials
62try:
63 from google.auth.crypt import es256
64except ImportError: # pragma: NO COVER
65 es256 = None # type: ignore
67_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
68_DEFAULT_MAX_CACHE_SIZE = 10
69_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
70_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"])
72if es256 is not None: # pragma: NO COVER
73 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore
76def encode(signer, payload, header=None, key_id=None):
77 """Make a signed JWT.
79 Args:
80 signer (google.auth.crypt.Signer): The signer used to sign the JWT.
81 payload (Mapping[str, str]): The JWT payload.
82 header (Mapping[str, str]): Additional JWT header payload.
83 key_id (str): The key id to add to the JWT header. If the
84 signer has a key id it will be used as the default. If this is
85 specified it will override the signer's key id.
87 Returns:
88 bytes: The encoded JWT.
89 """
90 if header is None:
91 header = {}
93 if key_id is None:
94 key_id = signer.key_id
96 header.update({"typ": "JWT"})
98 if "alg" not in header:
99 if es256 is not None and isinstance(signer, es256.ES256Signer):
100 header.update({"alg": "ES256"})
101 else:
102 header.update({"alg": "RS256"})
104 if key_id is not None:
105 header["kid"] = key_id
107 segments = [
108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
110 ]
112 signing_input = b".".join(segments)
113 signature = signer.sign(signing_input)
114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
116 return b".".join(segments)
119def _decode_jwt_segment(encoded_section):
120 """Decodes a single JWT segment."""
121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
122 try:
123 return json.loads(section_bytes.decode("utf-8"))
124 except ValueError as caught_exc:
125 new_exc = exceptions.MalformedError(
126 "Can't parse segment: {0}".format(section_bytes)
127 )
128 six.raise_from(new_exc, caught_exc)
131def _unverified_decode(token):
132 """Decodes a token and does no verification.
134 Args:
135 token (Union[str, bytes]): The encoded JWT.
137 Returns:
138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and
139 signature.
141 Raises:
142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type.
143 """
144 token = _helpers.to_bytes(token)
146 if token.count(b".") != 2:
147 raise exceptions.MalformedError(
148 "Wrong number of segments in token: {0}".format(token)
149 )
151 encoded_header, encoded_payload, signature = token.split(b".")
152 signed_section = encoded_header + b"." + encoded_payload
153 signature = _helpers.padded_urlsafe_b64decode(signature)
155 # Parse segments
156 header = _decode_jwt_segment(encoded_header)
157 payload = _decode_jwt_segment(encoded_payload)
159 if not isinstance(header, Mapping):
160 raise exceptions.MalformedError(
161 "Header segment should be a JSON object: {0}".format(encoded_header)
162 )
164 if not isinstance(payload, Mapping):
165 raise exceptions.MalformedError(
166 "Payload segment should be a JSON object: {0}".format(encoded_payload)
167 )
169 return header, payload, signed_section, signature
172def decode_header(token):
173 """Return the decoded header of a token.
175 No verification is done. This is useful to extract the key id from
176 the header in order to acquire the appropriate certificate to verify
177 the token.
179 Args:
180 token (Union[str, bytes]): the encoded JWT.
182 Returns:
183 Mapping: The decoded JWT header.
184 """
185 header, _, _, _ = _unverified_decode(token)
186 return header
189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0):
190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token
191 payload.
193 Args:
194 payload (Mapping[str, str]): The JWT payload.
195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
196 validation.
198 Raises:
199 google.auth.exceptions.InvalidValue: if value validation failed.
200 google.auth.exceptions.MalformedError: if schema validation failed.
201 """
202 now = _helpers.datetime_to_secs(_helpers.utcnow())
204 # Make sure the iat and exp claims are present.
205 for key in ("iat", "exp"):
206 if key not in payload:
207 raise exceptions.MalformedError(
208 "Token does not contain required claim {}".format(key)
209 )
211 # Make sure the token wasn't issued in the future.
212 iat = payload["iat"]
213 # Err on the side of accepting a token that is slightly early to account
214 # for clock skew.
215 earliest = iat - clock_skew_in_seconds
216 if now < earliest:
217 raise exceptions.InvalidValue(
218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format(
219 now, iat
220 )
221 )
223 # Make sure the token wasn't issued in the past.
224 exp = payload["exp"]
225 # Err on the side of accepting a token that is slightly out of date
226 # to account for clow skew.
227 latest = exp + clock_skew_in_seconds
228 if latest < now:
229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now))
232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0):
233 """Decode and verify a JWT.
235 Args:
236 token (str): The encoded JWT.
237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The
238 certificate used to validate the JWT signature. If bytes or string,
239 it must the the public key certificate in PEM format. If a mapping,
240 it must be a mapping of key IDs to public key certificates in PEM
241 format. The mapping must contain the same key ID that's specified
242 in the token's header.
243 verify (bool): Whether to perform signature and claim validation.
244 Verification is done by default.
245 audience (str or list): The audience claim, 'aud', that this JWT should
246 contain. Or a list of audience claims. If None then the JWT's 'aud'
247 parameter is not verified.
248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
249 validation.
251 Returns:
252 Mapping[str, str]: The deserialized JSON payload in the JWT.
254 Raises:
255 google.auth.exceptions.InvalidValue: if value validation failed.
256 google.auth.exceptions.MalformedError: if schema validation failed.
257 """
258 header, payload, signed_section, signature = _unverified_decode(token)
260 if not verify:
261 return payload
263 # Pluck the key id and algorithm from the header and make sure we have
264 # a verifier that can support it.
265 key_alg = header.get("alg")
266 key_id = header.get("kid")
268 try:
269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
270 except KeyError as exc:
271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
272 six.raise_from(
273 exceptions.InvalidValue(
274 "The key algorithm {} requires the cryptography package "
275 "to be installed.".format(key_alg)
276 ),
277 exc,
278 )
279 else:
280 six.raise_from(
281 exceptions.InvalidValue(
282 "Unsupported signature algorithm {}".format(key_alg)
283 ),
284 exc,
285 )
287 # If certs is specified as a dictionary of key IDs to certificates, then
288 # use the certificate identified by the key ID in the token header.
289 if isinstance(certs, Mapping):
290 if key_id:
291 if key_id not in certs:
292 raise exceptions.MalformedError(
293 "Certificate for key id {} not found.".format(key_id)
294 )
295 certs_to_check = [certs[key_id]]
296 # If there's no key id in the header, check against all of the certs.
297 else:
298 certs_to_check = certs.values()
299 else:
300 certs_to_check = certs
302 # Verify that the signature matches the message.
303 if not crypt.verify_signature(
304 signed_section, signature, certs_to_check, verifier_cls
305 ):
306 raise exceptions.MalformedError("Could not verify token signature.")
308 # Verify the issued at and created times in the payload.
309 _verify_iat_and_exp(payload, clock_skew_in_seconds)
311 # Check audience.
312 if audience is not None:
313 claim_audience = payload.get("aud")
314 if isinstance(audience, str):
315 audience = [audience]
316 if claim_audience not in audience:
317 raise exceptions.InvalidValue(
318 "Token has wrong audience {}, expected one of {}".format(
319 claim_audience, audience
320 )
321 )
323 return payload
326class Credentials(
327 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
328):
329 """Credentials that use a JWT as the bearer token.
331 These credentials require an "audience" claim. This claim identifies the
332 intended recipient of the bearer token.
334 The constructor arguments determine the claims for the JWT that is
335 sent with requests. Usually, you'll construct these credentials with
336 one of the helper constructors as shown in the next section.
338 To create JWT credentials using a Google service account private key
339 JSON file::
341 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
342 credentials = jwt.Credentials.from_service_account_file(
343 'service-account.json',
344 audience=audience)
346 If you already have the service account file loaded and parsed::
348 service_account_info = json.load(open('service_account.json'))
349 credentials = jwt.Credentials.from_service_account_info(
350 service_account_info,
351 audience=audience)
353 Both helper methods pass on arguments to the constructor, so you can
354 specify the JWT claims::
356 credentials = jwt.Credentials.from_service_account_file(
357 'service-account.json',
358 audience=audience,
359 additional_claims={'meta': 'data'})
361 You can also construct the credentials directly if you have a
362 :class:`~google.auth.crypt.Signer` instance::
364 credentials = jwt.Credentials(
365 signer,
366 issuer='your-issuer',
367 subject='your-subject',
368 audience=audience)
370 The claims are considered immutable. If you want to modify the claims,
371 you can easily create another instance using :meth:`with_claims`::
373 new_audience = (
374 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber')
375 new_credentials = credentials.with_claims(audience=new_audience)
376 """
378 def __init__(
379 self,
380 signer,
381 issuer,
382 subject,
383 audience,
384 additional_claims=None,
385 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
386 quota_project_id=None,
387 ):
388 """
389 Args:
390 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
391 issuer (str): The `iss` claim.
392 subject (str): The `sub` claim.
393 audience (str): the `aud` claim. The intended audience for the
394 credentials.
395 additional_claims (Mapping[str, str]): Any additional claims for
396 the JWT payload.
397 token_lifetime (int): The amount of time in seconds for
398 which the token is valid. Defaults to 1 hour.
399 quota_project_id (Optional[str]): The project ID used for quota
400 and billing.
401 """
402 super(Credentials, self).__init__()
403 self._signer = signer
404 self._issuer = issuer
405 self._subject = subject
406 self._audience = audience
407 self._token_lifetime = token_lifetime
408 self._quota_project_id = quota_project_id
410 if additional_claims is None:
411 additional_claims = {}
413 self._additional_claims = additional_claims
415 @classmethod
416 def _from_signer_and_info(cls, signer, info, **kwargs):
417 """Creates a Credentials instance from a signer and service account
418 info.
420 Args:
421 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
422 info (Mapping[str, str]): The service account info.
423 kwargs: Additional arguments to pass to the constructor.
425 Returns:
426 google.auth.jwt.Credentials: The constructed credentials.
428 Raises:
429 google.auth.exceptions.MalformedError: If the info is not in the expected format.
430 """
431 kwargs.setdefault("subject", info["client_email"])
432 kwargs.setdefault("issuer", info["client_email"])
433 return cls(signer, **kwargs)
435 @classmethod
436 def from_service_account_info(cls, info, **kwargs):
437 """Creates an Credentials instance from a dictionary.
439 Args:
440 info (Mapping[str, str]): The service account info in Google
441 format.
442 kwargs: Additional arguments to pass to the constructor.
444 Returns:
445 google.auth.jwt.Credentials: The constructed credentials.
447 Raises:
448 google.auth.exceptions.MalformedError: If the info is not in the expected format.
449 """
450 signer = _service_account_info.from_dict(info, require=["client_email"])
451 return cls._from_signer_and_info(signer, info, **kwargs)
453 @classmethod
454 def from_service_account_file(cls, filename, **kwargs):
455 """Creates a Credentials instance from a service account .json file
456 in Google format.
458 Args:
459 filename (str): The path to the service account .json file.
460 kwargs: Additional arguments to pass to the constructor.
462 Returns:
463 google.auth.jwt.Credentials: The constructed credentials.
464 """
465 info, signer = _service_account_info.from_filename(
466 filename, require=["client_email"]
467 )
468 return cls._from_signer_and_info(signer, info, **kwargs)
470 @classmethod
471 def from_signing_credentials(cls, credentials, audience, **kwargs):
472 """Creates a new :class:`google.auth.jwt.Credentials` instance from an
473 existing :class:`google.auth.credentials.Signing` instance.
475 The new instance will use the same signer as the existing instance and
476 will use the existing instance's signer email as the issuer and
477 subject by default.
479 Example::
481 svc_creds = service_account.Credentials.from_service_account_file(
482 'service_account.json')
483 audience = (
484 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher')
485 jwt_creds = jwt.Credentials.from_signing_credentials(
486 svc_creds, audience=audience)
488 Args:
489 credentials (google.auth.credentials.Signing): The credentials to
490 use to construct the new credentials.
491 audience (str): the `aud` claim. The intended audience for the
492 credentials.
493 kwargs: Additional arguments to pass to the constructor.
495 Returns:
496 google.auth.jwt.Credentials: A new Credentials instance.
497 """
498 kwargs.setdefault("issuer", credentials.signer_email)
499 kwargs.setdefault("subject", credentials.signer_email)
500 return cls(credentials.signer, audience=audience, **kwargs)
502 def with_claims(
503 self, issuer=None, subject=None, audience=None, additional_claims=None
504 ):
505 """Returns a copy of these credentials with modified claims.
507 Args:
508 issuer (str): The `iss` claim. If unspecified the current issuer
509 claim will be used.
510 subject (str): The `sub` claim. If unspecified the current subject
511 claim will be used.
512 audience (str): the `aud` claim. If unspecified the current
513 audience claim will be used.
514 additional_claims (Mapping[str, str]): Any additional claims for
515 the JWT payload. This will be merged with the current
516 additional claims.
518 Returns:
519 google.auth.jwt.Credentials: A new credentials instance.
520 """
521 new_additional_claims = copy.deepcopy(self._additional_claims)
522 new_additional_claims.update(additional_claims or {})
524 return self.__class__(
525 self._signer,
526 issuer=issuer if issuer is not None else self._issuer,
527 subject=subject if subject is not None else self._subject,
528 audience=audience if audience is not None else self._audience,
529 additional_claims=new_additional_claims,
530 quota_project_id=self._quota_project_id,
531 )
533 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
534 def with_quota_project(self, quota_project_id):
535 return self.__class__(
536 self._signer,
537 issuer=self._issuer,
538 subject=self._subject,
539 audience=self._audience,
540 additional_claims=self._additional_claims,
541 quota_project_id=quota_project_id,
542 )
544 def _make_jwt(self):
545 """Make a signed JWT.
547 Returns:
548 Tuple[bytes, datetime]: The encoded JWT and the expiration.
549 """
550 now = _helpers.utcnow()
551 lifetime = datetime.timedelta(seconds=self._token_lifetime)
552 expiry = now + lifetime
554 payload = {
555 "iss": self._issuer,
556 "sub": self._subject,
557 "iat": _helpers.datetime_to_secs(now),
558 "exp": _helpers.datetime_to_secs(expiry),
559 }
560 if self._audience:
561 payload["aud"] = self._audience
563 payload.update(self._additional_claims)
565 jwt = encode(self._signer, payload)
567 return jwt, expiry
569 def refresh(self, request):
570 """Refreshes the access token.
572 Args:
573 request (Any): Unused.
574 """
575 # pylint: disable=unused-argument
576 # (pylint doesn't correctly recognize overridden methods.)
577 self.token, self.expiry = self._make_jwt()
579 @_helpers.copy_docstring(google.auth.credentials.Signing)
580 def sign_bytes(self, message):
581 return self._signer.sign(message)
583 @property # type: ignore
584 @_helpers.copy_docstring(google.auth.credentials.Signing)
585 def signer_email(self):
586 return self._issuer
588 @property # type: ignore
589 @_helpers.copy_docstring(google.auth.credentials.Signing)
590 def signer(self):
591 return self._signer
593 @property # type: ignore
594 def additional_claims(self):
595 """ Additional claims the JWT object was created with."""
596 return self._additional_claims
599class OnDemandCredentials(
600 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
601):
602 """On-demand JWT credentials.
604 Like :class:`Credentials`, this class uses a JWT as the bearer token for
605 authentication. However, this class does not require the audience at
606 construction time. Instead, it will generate a new token on-demand for
607 each request using the request URI as the audience. It caches tokens
608 so that multiple requests to the same URI do not incur the overhead
609 of generating a new token every time.
611 This behavior is especially useful for `gRPC`_ clients. A gRPC service may
612 have multiple audience and gRPC clients may not know all of the audiences
613 required for accessing a particular service. With these credentials,
614 no knowledge of the audiences is required ahead of time.
616 .. _grpc: http://www.grpc.io/
617 """
619 def __init__(
620 self,
621 signer,
622 issuer,
623 subject,
624 additional_claims=None,
625 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
626 max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
627 quota_project_id=None,
628 ):
629 """
630 Args:
631 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
632 issuer (str): The `iss` claim.
633 subject (str): The `sub` claim.
634 additional_claims (Mapping[str, str]): Any additional claims for
635 the JWT payload.
636 token_lifetime (int): The amount of time in seconds for
637 which the token is valid. Defaults to 1 hour.
638 max_cache_size (int): The maximum number of JWT tokens to keep in
639 cache. Tokens are cached using :class:`cachetools.LRUCache`.
640 quota_project_id (Optional[str]): The project ID used for quota
641 and billing.
643 """
644 super(OnDemandCredentials, self).__init__()
645 self._signer = signer
646 self._issuer = issuer
647 self._subject = subject
648 self._token_lifetime = token_lifetime
649 self._quota_project_id = quota_project_id
651 if additional_claims is None:
652 additional_claims = {}
654 self._additional_claims = additional_claims
655 self._cache = cachetools.LRUCache(maxsize=max_cache_size)
657 @classmethod
658 def _from_signer_and_info(cls, signer, info, **kwargs):
659 """Creates an OnDemandCredentials instance from a signer and service
660 account info.
662 Args:
663 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
664 info (Mapping[str, str]): The service account info.
665 kwargs: Additional arguments to pass to the constructor.
667 Returns:
668 google.auth.jwt.OnDemandCredentials: The constructed credentials.
670 Raises:
671 google.auth.exceptions.MalformedError: If the info is not in the expected format.
672 """
673 kwargs.setdefault("subject", info["client_email"])
674 kwargs.setdefault("issuer", info["client_email"])
675 return cls(signer, **kwargs)
677 @classmethod
678 def from_service_account_info(cls, info, **kwargs):
679 """Creates an OnDemandCredentials instance from a dictionary.
681 Args:
682 info (Mapping[str, str]): The service account info in Google
683 format.
684 kwargs: Additional arguments to pass to the constructor.
686 Returns:
687 google.auth.jwt.OnDemandCredentials: The constructed credentials.
689 Raises:
690 google.auth.exceptions.MalformedError: If the info is not in the expected format.
691 """
692 signer = _service_account_info.from_dict(info, require=["client_email"])
693 return cls._from_signer_and_info(signer, info, **kwargs)
695 @classmethod
696 def from_service_account_file(cls, filename, **kwargs):
697 """Creates an OnDemandCredentials instance from a service account .json
698 file in Google format.
700 Args:
701 filename (str): The path to the service account .json file.
702 kwargs: Additional arguments to pass to the constructor.
704 Returns:
705 google.auth.jwt.OnDemandCredentials: The constructed credentials.
706 """
707 info, signer = _service_account_info.from_filename(
708 filename, require=["client_email"]
709 )
710 return cls._from_signer_and_info(signer, info, **kwargs)
712 @classmethod
713 def from_signing_credentials(cls, credentials, **kwargs):
714 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance
715 from an existing :class:`google.auth.credentials.Signing` instance.
717 The new instance will use the same signer as the existing instance and
718 will use the existing instance's signer email as the issuer and
719 subject by default.
721 Example::
723 svc_creds = service_account.Credentials.from_service_account_file(
724 'service_account.json')
725 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials(
726 svc_creds)
728 Args:
729 credentials (google.auth.credentials.Signing): The credentials to
730 use to construct the new credentials.
731 kwargs: Additional arguments to pass to the constructor.
733 Returns:
734 google.auth.jwt.Credentials: A new Credentials instance.
735 """
736 kwargs.setdefault("issuer", credentials.signer_email)
737 kwargs.setdefault("subject", credentials.signer_email)
738 return cls(credentials.signer, **kwargs)
740 def with_claims(self, issuer=None, subject=None, additional_claims=None):
741 """Returns a copy of these credentials with modified claims.
743 Args:
744 issuer (str): The `iss` claim. If unspecified the current issuer
745 claim will be used.
746 subject (str): The `sub` claim. If unspecified the current subject
747 claim will be used.
748 additional_claims (Mapping[str, str]): Any additional claims for
749 the JWT payload. This will be merged with the current
750 additional claims.
752 Returns:
753 google.auth.jwt.OnDemandCredentials: A new credentials instance.
754 """
755 new_additional_claims = copy.deepcopy(self._additional_claims)
756 new_additional_claims.update(additional_claims or {})
758 return self.__class__(
759 self._signer,
760 issuer=issuer if issuer is not None else self._issuer,
761 subject=subject if subject is not None else self._subject,
762 additional_claims=new_additional_claims,
763 max_cache_size=self._cache.maxsize,
764 quota_project_id=self._quota_project_id,
765 )
767 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
768 def with_quota_project(self, quota_project_id):
770 return self.__class__(
771 self._signer,
772 issuer=self._issuer,
773 subject=self._subject,
774 additional_claims=self._additional_claims,
775 max_cache_size=self._cache.maxsize,
776 quota_project_id=quota_project_id,
777 )
779 @property
780 def valid(self):
781 """Checks the validity of the credentials.
783 These credentials are always valid because it generates tokens on
784 demand.
785 """
786 return True
788 def _make_jwt_for_audience(self, audience):
789 """Make a new JWT for the given audience.
791 Args:
792 audience (str): The intended audience.
794 Returns:
795 Tuple[bytes, datetime]: The encoded JWT and the expiration.
796 """
797 now = _helpers.utcnow()
798 lifetime = datetime.timedelta(seconds=self._token_lifetime)
799 expiry = now + lifetime
801 payload = {
802 "iss": self._issuer,
803 "sub": self._subject,
804 "iat": _helpers.datetime_to_secs(now),
805 "exp": _helpers.datetime_to_secs(expiry),
806 "aud": audience,
807 }
809 payload.update(self._additional_claims)
811 jwt = encode(self._signer, payload)
813 return jwt, expiry
815 def _get_jwt_for_audience(self, audience):
816 """Get a JWT For a given audience.
818 If there is already an existing, non-expired token in the cache for
819 the audience, that token is used. Otherwise, a new token will be
820 created.
822 Args:
823 audience (str): The intended audience.
825 Returns:
826 bytes: The encoded JWT.
827 """
828 token, expiry = self._cache.get(audience, (None, None))
830 if token is None or expiry < _helpers.utcnow():
831 token, expiry = self._make_jwt_for_audience(audience)
832 self._cache[audience] = token, expiry
834 return token
836 def refresh(self, request):
837 """Raises an exception, these credentials can not be directly
838 refreshed.
840 Args:
841 request (Any): Unused.
843 Raises:
844 google.auth.RefreshError
845 """
846 # pylint: disable=unused-argument
847 # (pylint doesn't correctly recognize overridden methods.)
848 raise exceptions.RefreshError(
849 "OnDemandCredentials can not be directly refreshed."
850 )
852 def before_request(self, request, method, url, headers):
853 """Performs credential-specific before request logic.
855 Args:
856 request (Any): Unused. JWT credentials do not need to make an
857 HTTP request to refresh.
858 method (str): The request's HTTP method.
859 url (str): The request's URI. This is used as the audience claim
860 when generating the JWT.
861 headers (Mapping): The request's headers.
862 """
863 # pylint: disable=unused-argument
864 # (pylint doesn't correctly recognize overridden methods.)
865 parts = urllib.parse.urlsplit(url)
866 # Strip query string and fragment
867 audience = urllib.parse.urlunsplit(
868 (parts.scheme, parts.netloc, parts.path, "", "")
869 )
870 token = self._get_jwt_for_audience(audience)
871 self.apply(headers, token=token)
873 @_helpers.copy_docstring(google.auth.credentials.Signing)
874 def sign_bytes(self, message):
875 return self._signer.sign(message)
877 @property # type: ignore
878 @_helpers.copy_docstring(google.auth.credentials.Signing)
879 def signer_email(self):
880 return self._issuer
882 @property # type: ignore
883 @_helpers.copy_docstring(google.auth.credentials.Signing)
884 def signer(self):
885 return self._signer