Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/jwt.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""JSON Web Tokens
17Provides support for creating (encoding) and verifying (decoding) JWTs,
18especially JWTs generated and consumed by Google infrastructure.
20See `rfc7519`_ for more details on JWTs.
22To encode a JWT use :func:`encode`::
24 from google.auth import crypt
25 from google.auth import jwt
27 signer = crypt.Signer(private_key)
28 payload = {'some': 'payload'}
29 encoded = jwt.encode(signer, payload)
31To decode a JWT and verify claims use :func:`decode`::
33 claims = jwt.decode(encoded, certs=public_certs)
35You can also skip verification::
37 claims = jwt.decode(encoded, verify=False)
39.. _rfc7519: https://tools.ietf.org/html/rfc7519
41"""
43try:
44 from collections.abc import Mapping
45# Python 2.7 compatibility
46except ImportError: # pragma: NO COVER
47 from collections import Mapping # type: ignore
48import copy
49import datetime
50import json
51import urllib
53from google.auth import _cache
54from google.auth import _helpers
55from google.auth import _regional_access_boundary_utils
56from google.auth import _service_account_info
57from google.auth import crypt
58from google.auth import exceptions
59import google.auth.credentials
61try:
62 from google.auth.crypt import es
63except ImportError: # pragma: NO COVER
64 es = None # type: ignore
66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
67_DEFAULT_MAX_CACHE_SIZE = 10
68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256", "ES384"])
71if es is not None: # pragma: NO COVER
72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es.EsVerifier # type: ignore
73 _ALGORITHM_TO_VERIFIER_CLASS["ES384"] = es.EsVerifier # type: ignore
76def encode(signer, payload, header=None, key_id=None):
77 """Make a signed JWT.
79 Args:
80 signer (google.auth.crypt.Signer): The signer used to sign the JWT.
81 payload (Mapping[str, str]): The JWT payload.
82 header (Mapping[str, str]): Additional JWT header payload.
83 key_id (str): The key id to add to the JWT header. If the
84 signer has a key id it will be used as the default. If this is
85 specified it will override the signer's key id.
87 Returns:
88 bytes: The encoded JWT.
89 """
90 if header is None:
91 header = {}
93 if key_id is None:
94 key_id = signer.key_id
96 header.update({"typ": "JWT"})
98 if "alg" not in header:
99 if es is not None and isinstance(signer, es.EsSigner):
100 header.update({"alg": signer.algorithm})
101 else:
102 header.update({"alg": "RS256"})
104 if key_id is not None:
105 header["kid"] = key_id
107 segments = [
108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
110 ]
112 signing_input = b".".join(segments)
113 signature = signer.sign(signing_input)
114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
116 return b".".join(segments)
119def _decode_jwt_segment(encoded_section):
120 """Decodes a single JWT segment."""
121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
122 try:
123 return json.loads(section_bytes.decode("utf-8"))
124 except ValueError as caught_exc:
125 new_exc = exceptions.MalformedError(
126 "Can't parse segment: {0}".format(section_bytes)
127 )
128 raise new_exc from caught_exc
131def _unverified_decode(token):
132 """Decodes a token and does no verification.
134 Args:
135 token (Union[str, bytes]): The encoded JWT.
137 Returns:
138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and
139 signature.
141 Raises:
142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type.
143 """
144 token = _helpers.to_bytes(token)
146 if token.count(b".") != 2:
147 raise exceptions.MalformedError(
148 "Wrong number of segments in token: {0}".format(token)
149 )
151 encoded_header, encoded_payload, signature = token.split(b".")
152 signed_section = encoded_header + b"." + encoded_payload
153 signature = _helpers.padded_urlsafe_b64decode(signature)
155 # Parse segments
156 header = _decode_jwt_segment(encoded_header)
157 payload = _decode_jwt_segment(encoded_payload)
159 if not isinstance(header, Mapping):
160 raise exceptions.MalformedError(
161 "Header segment should be a JSON object: {0}".format(encoded_header)
162 )
164 if not isinstance(payload, Mapping):
165 raise exceptions.MalformedError(
166 "Payload segment should be a JSON object: {0}".format(encoded_payload)
167 )
169 return header, payload, signed_section, signature
172def decode_header(token):
173 """Return the decoded header of a token.
175 No verification is done. This is useful to extract the key id from
176 the header in order to acquire the appropriate certificate to verify
177 the token.
179 Args:
180 token (Union[str, bytes]): the encoded JWT.
182 Returns:
183 Mapping: The decoded JWT header.
184 """
185 header, _, _, _ = _unverified_decode(token)
186 return header
189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0):
190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token
191 payload.
193 Args:
194 payload (Mapping[str, str]): The JWT payload.
195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
196 validation.
198 Raises:
199 google.auth.exceptions.InvalidValue: if value validation failed.
200 google.auth.exceptions.MalformedError: if schema validation failed.
201 """
202 now = _helpers.datetime_to_secs(_helpers.utcnow())
204 # Make sure the iat and exp claims are present.
205 for key in ("iat", "exp"):
206 if key not in payload:
207 raise exceptions.MalformedError(
208 "Token does not contain required claim {}".format(key)
209 )
211 # Make sure the token wasn't issued in the future.
212 iat = payload["iat"]
213 # Err on the side of accepting a token that is slightly early to account
214 # for clock skew.
215 earliest = iat - clock_skew_in_seconds
216 if now < earliest:
217 raise exceptions.InvalidValue(
218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format(
219 now, iat
220 )
221 )
223 # Make sure the token wasn't issued in the past.
224 exp = payload["exp"]
225 # Err on the side of accepting a token that is slightly out of date
226 # to account for clow skew.
227 latest = exp + clock_skew_in_seconds
228 if latest < now:
229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now))
232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0):
233 """Decode and verify a JWT.
235 Args:
236 token (str): The encoded JWT.
237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The
238 certificate used to validate the JWT signature. If bytes or string,
239 it must the the public key certificate in PEM format. If a mapping,
240 it must be a mapping of key IDs to public key certificates in PEM
241 format. The mapping must contain the same key ID that's specified
242 in the token's header.
243 verify (bool): Whether to perform signature and claim validation.
244 Verification is done by default.
245 audience (str or list): The audience claim, 'aud', that this JWT should
246 contain. Or a list of audience claims. If None then the JWT's 'aud'
247 parameter is not verified.
248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
249 validation.
251 Returns:
252 Mapping[str, str]: The deserialized JSON payload in the JWT.
254 Raises:
255 google.auth.exceptions.InvalidValue: if value validation failed.
256 google.auth.exceptions.MalformedError: if schema validation failed.
257 """
258 header, payload, signed_section, signature = _unverified_decode(token)
260 if not verify:
261 return payload
263 # Pluck the key id and algorithm from the header and make sure we have
264 # a verifier that can support it.
265 key_alg = header.get("alg")
266 key_id = header.get("kid")
268 try:
269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
270 except KeyError as exc:
271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
272 raise exceptions.InvalidValue(
273 "The key algorithm {} requires the cryptography package to be installed.".format(
274 key_alg
275 )
276 ) from exc
277 else:
278 raise exceptions.InvalidValue(
279 "Unsupported signature algorithm {}".format(key_alg)
280 ) from exc
281 # If certs is specified as a dictionary of key IDs to certificates, then
282 # use the certificate identified by the key ID in the token header.
283 if isinstance(certs, Mapping):
284 if key_id:
285 if key_id not in certs:
286 raise exceptions.MalformedError(
287 "Certificate for key id {} not found.".format(key_id)
288 )
289 certs_to_check = [certs[key_id]]
290 # If there's no key id in the header, check against all of the certs.
291 else:
292 certs_to_check = certs.values()
293 else:
294 certs_to_check = certs
296 # Verify that the signature matches the message.
297 if not crypt.verify_signature(
298 signed_section, signature, certs_to_check, verifier_cls
299 ):
300 raise exceptions.MalformedError("Could not verify token signature.")
302 # Verify the issued at and created times in the payload.
303 _verify_iat_and_exp(payload, clock_skew_in_seconds)
305 # Check audience.
306 if audience is not None:
307 claim_audience = payload.get("aud")
308 if isinstance(audience, str):
309 audience = [audience]
310 if claim_audience not in audience:
311 raise exceptions.InvalidValue(
312 "Token has wrong audience {}, expected one of {}".format(
313 claim_audience, audience
314 )
315 )
317 return payload
320class Credentials(
321 google.auth.credentials.Signing,
322 google.auth.credentials.CredentialsWithQuotaProject,
323 google.auth.credentials.CredentialsWithRegionalAccessBoundary,
324):
325 """Credentials that use a JWT as the bearer token.
327 These credentials require an "audience" claim. This claim identifies the
328 intended recipient of the bearer token.
330 The constructor arguments determine the claims for the JWT that is
331 sent with requests. Usually, you'll construct these credentials with
332 one of the helper constructors as shown in the next section.
334 To create JWT credentials using a Google service account private key
335 JSON file::
337 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
338 credentials = jwt.Credentials.from_service_account_file(
339 'service-account.json',
340 audience=audience)
342 If you already have the service account file loaded and parsed::
344 service_account_info = json.load(open('service_account.json'))
345 credentials = jwt.Credentials.from_service_account_info(
346 service_account_info,
347 audience=audience)
349 Both helper methods pass on arguments to the constructor, so you can
350 specify the JWT claims::
352 credentials = jwt.Credentials.from_service_account_file(
353 'service-account.json',
354 audience=audience,
355 additional_claims={'meta': 'data'})
357 You can also construct the credentials directly if you have a
358 :class:`~google.auth.crypt.Signer` instance::
360 credentials = jwt.Credentials(
361 signer,
362 issuer='your-issuer',
363 subject='your-subject',
364 audience=audience)
366 The claims are considered immutable. If you want to modify the claims,
367 you can easily create another instance using :meth:`with_claims`::
369 new_audience = (
370 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber')
371 new_credentials = credentials.with_claims(audience=new_audience)
372 """
374 def __init__(
375 self,
376 signer,
377 issuer,
378 subject,
379 audience,
380 additional_claims=None,
381 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
382 quota_project_id=None,
383 ):
384 """
385 Args:
386 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
387 issuer (str): The `iss` claim.
388 subject (str): The `sub` claim.
389 audience (str): the `aud` claim. The intended audience for the
390 credentials.
391 additional_claims (Mapping[str, str]): Any additional claims for
392 the JWT payload.
393 token_lifetime (int): The amount of time in seconds for
394 which the token is valid. Defaults to 1 hour.
395 quota_project_id (Optional[str]): The project ID used for quota
396 and billing.
397 """
398 super(Credentials, self).__init__()
399 self._signer = signer
400 self._issuer = issuer
401 self._subject = subject
402 self._audience = audience
403 self._token_lifetime = token_lifetime
404 self._quota_project_id = quota_project_id
406 if additional_claims is None:
407 additional_claims = {}
409 self._additional_claims = additional_claims
411 @classmethod
412 def _from_signer_and_info(cls, signer, info, **kwargs):
413 """Creates a Credentials instance from a signer and service account
414 info.
416 Args:
417 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
418 info (Mapping[str, str]): The service account info.
419 kwargs: Additional arguments to pass to the constructor.
421 Returns:
422 google.auth.jwt.Credentials: The constructed credentials.
424 Raises:
425 google.auth.exceptions.MalformedError: If the info is not in the expected format.
426 """
427 kwargs.setdefault("subject", info["client_email"])
428 kwargs.setdefault("issuer", info["client_email"])
429 return cls(signer, **kwargs)
431 @classmethod
432 def from_service_account_info(cls, info, **kwargs):
433 """Creates an Credentials instance from a dictionary.
435 Args:
436 info (Mapping[str, str]): The service account info in Google
437 format.
438 kwargs: Additional arguments to pass to the constructor.
440 Returns:
441 google.auth.jwt.Credentials: The constructed credentials.
443 Raises:
444 google.auth.exceptions.MalformedError: If the info is not in the expected format.
445 """
446 signer = _service_account_info.from_dict(info, require=["client_email"])
447 return cls._from_signer_and_info(signer, info, **kwargs)
449 @classmethod
450 def from_service_account_file(cls, filename, **kwargs):
451 """Creates a Credentials instance from a service account .json file
452 in Google format.
454 Args:
455 filename (str): The path to the service account .json file.
456 kwargs: Additional arguments to pass to the constructor.
458 Returns:
459 google.auth.jwt.Credentials: The constructed credentials.
460 """
461 info, signer = _service_account_info.from_filename(
462 filename, require=["client_email"]
463 )
464 return cls._from_signer_and_info(signer, info, **kwargs)
466 @classmethod
467 def from_signing_credentials(cls, credentials, audience, **kwargs):
468 """Creates a new :class:`google.auth.jwt.Credentials` instance from an
469 existing :class:`google.auth.credentials.Signing` instance.
471 The new instance will use the same signer as the existing instance and
472 will use the existing instance's signer email as the issuer and
473 subject by default.
475 Example::
477 svc_creds = service_account.Credentials.from_service_account_file(
478 'service_account.json')
479 audience = (
480 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher')
481 jwt_creds = jwt.Credentials.from_signing_credentials(
482 svc_creds, audience=audience)
484 Args:
485 credentials (google.auth.credentials.Signing): The credentials to
486 use to construct the new credentials.
487 audience (str): the `aud` claim. The intended audience for the
488 credentials.
489 kwargs: Additional arguments to pass to the constructor.
491 Returns:
492 google.auth.jwt.Credentials: A new Credentials instance.
493 """
494 kwargs.setdefault("issuer", credentials.signer_email)
495 kwargs.setdefault("subject", credentials.signer_email)
496 jwt_creds = cls(credentials.signer, audience=audience, **kwargs)
498 if isinstance(
499 credentials,
500 google.auth.credentials.CredentialsWithRegionalAccessBoundary,
501 ):
502 credentials._copy_regional_access_boundary_manager(jwt_creds)
504 return jwt_creds
506 def with_claims(
507 self, issuer=None, subject=None, audience=None, additional_claims=None
508 ):
509 """Returns a copy of these credentials with modified claims.
511 Args:
512 issuer (str): The `iss` claim. If unspecified the current issuer
513 claim will be used.
514 subject (str): The `sub` claim. If unspecified the current subject
515 claim will be used.
516 audience (str): the `aud` claim. If unspecified the current
517 audience claim will be used.
518 additional_claims (Mapping[str, str]): Any additional claims for
519 the JWT payload. This will be merged with the current
520 additional claims.
522 Returns:
523 google.auth.jwt.Credentials: A new credentials instance.
524 """
525 new_additional_claims = copy.deepcopy(self._additional_claims)
526 new_additional_claims.update(additional_claims or {})
528 cred = self.__class__(
529 self._signer,
530 issuer=issuer if issuer is not None else self._issuer,
531 subject=subject if subject is not None else self._subject,
532 audience=audience if audience is not None else self._audience,
533 additional_claims=new_additional_claims,
534 quota_project_id=self._quota_project_id,
535 )
536 self._copy_regional_access_boundary_manager(cred)
537 return cred
539 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
540 def with_quota_project(self, quota_project_id):
541 cred = self.__class__(
542 self._signer,
543 issuer=self._issuer,
544 subject=self._subject,
545 audience=self._audience,
546 additional_claims=self._additional_claims,
547 quota_project_id=quota_project_id,
548 )
549 self._copy_regional_access_boundary_manager(cred)
550 return cred
552 def _make_jwt(self):
553 """Make a signed JWT.
555 Returns:
556 Tuple[bytes, datetime]: The encoded JWT and the expiration.
557 """
558 now = _helpers.utcnow()
559 lifetime = datetime.timedelta(seconds=self._token_lifetime)
560 expiry = now + lifetime
562 payload = {
563 "iss": self._issuer,
564 "sub": self._subject,
565 "iat": _helpers.datetime_to_secs(now),
566 "exp": _helpers.datetime_to_secs(expiry),
567 }
568 if self._audience:
569 payload["aud"] = self._audience
571 payload.update(self._additional_claims)
573 jwt = encode(self._signer, payload)
575 return jwt, expiry
577 def _perform_refresh_token(self, request):
578 """Refreshes the access token.
580 Args:
581 request (Any): Unused.
582 """
583 # pylint: disable=unused-argument
584 # (pylint doesn't correctly recognize overridden methods.)
585 self.token, self.expiry = self._make_jwt()
587 def _build_regional_access_boundary_lookup_url(self, request=None):
588 """Builds the lookup URL using the service account's email address."""
589 if not self.signer_email:
590 return None
592 return _regional_access_boundary_utils.get_service_account_rab_endpoint(
593 self.signer_email
594 )
596 @_helpers.copy_docstring(google.auth.credentials.Signing)
597 def sign_bytes(self, message):
598 return self._signer.sign(message)
600 @property # type: ignore
601 @_helpers.copy_docstring(google.auth.credentials.Signing)
602 def signer_email(self):
603 return self._issuer
605 @property # type: ignore
606 @_helpers.copy_docstring(google.auth.credentials.Signing)
607 def signer(self):
608 return self._signer
610 @property # type: ignore
611 def additional_claims(self):
612 """Additional claims the JWT object was created with."""
613 return self._additional_claims
616class OnDemandCredentials(
617 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject
618):
619 """On-demand JWT credentials.
621 Like :class:`Credentials`, this class uses a JWT as the bearer token for
622 authentication. However, this class does not require the audience at
623 construction time. Instead, it will generate a new token on-demand for
624 each request using the request URI as the audience. It caches tokens
625 so that multiple requests to the same URI do not incur the overhead
626 of generating a new token every time.
628 This behavior is especially useful for `gRPC`_ clients. A gRPC service may
629 have multiple audience and gRPC clients may not know all of the audiences
630 required for accessing a particular service. With these credentials,
631 no knowledge of the audiences is required ahead of time.
633 .. _grpc: http://www.grpc.io/
634 """
636 def __init__(
637 self,
638 signer,
639 issuer,
640 subject,
641 additional_claims=None,
642 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
643 max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
644 quota_project_id=None,
645 ):
646 """
647 Args:
648 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
649 issuer (str): The `iss` claim.
650 subject (str): The `sub` claim.
651 additional_claims (Mapping[str, str]): Any additional claims for
652 the JWT payload.
653 token_lifetime (int): The amount of time in seconds for
654 which the token is valid. Defaults to 1 hour.
655 max_cache_size (int): The maximum number of JWT tokens to keep in
656 cache. Tokens are cached using :class:`google.auth._cache.LRUCache`.
657 quota_project_id (Optional[str]): The project ID used for quota
658 and billing.
660 """
661 super(OnDemandCredentials, self).__init__()
662 self._signer = signer
663 self._issuer = issuer
664 self._subject = subject
665 self._token_lifetime = token_lifetime
666 self._quota_project_id = quota_project_id
668 if additional_claims is None:
669 additional_claims = {}
671 self._additional_claims = additional_claims
672 self._cache = _cache.LRUCache(maxsize=max_cache_size)
674 @classmethod
675 def _from_signer_and_info(cls, signer, info, **kwargs):
676 """Creates an OnDemandCredentials instance from a signer and service
677 account info.
679 Args:
680 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
681 info (Mapping[str, str]): The service account info.
682 kwargs: Additional arguments to pass to the constructor.
684 Returns:
685 google.auth.jwt.OnDemandCredentials: The constructed credentials.
687 Raises:
688 google.auth.exceptions.MalformedError: If the info is not in the expected format.
689 """
690 kwargs.setdefault("subject", info["client_email"])
691 kwargs.setdefault("issuer", info["client_email"])
692 return cls(signer, **kwargs)
694 @classmethod
695 def from_service_account_info(cls, info, **kwargs):
696 """Creates an OnDemandCredentials instance from a dictionary.
698 Args:
699 info (Mapping[str, str]): The service account info in Google
700 format.
701 kwargs: Additional arguments to pass to the constructor.
703 Returns:
704 google.auth.jwt.OnDemandCredentials: The constructed credentials.
706 Raises:
707 google.auth.exceptions.MalformedError: If the info is not in the expected format.
708 """
709 signer = _service_account_info.from_dict(info, require=["client_email"])
710 return cls._from_signer_and_info(signer, info, **kwargs)
712 @classmethod
713 def from_service_account_file(cls, filename, **kwargs):
714 """Creates an OnDemandCredentials instance from a service account .json
715 file in Google format.
717 Args:
718 filename (str): The path to the service account .json file.
719 kwargs: Additional arguments to pass to the constructor.
721 Returns:
722 google.auth.jwt.OnDemandCredentials: The constructed credentials.
723 """
724 info, signer = _service_account_info.from_filename(
725 filename, require=["client_email"]
726 )
727 return cls._from_signer_and_info(signer, info, **kwargs)
729 @classmethod
730 def from_signing_credentials(cls, credentials, **kwargs):
731 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance
732 from an existing :class:`google.auth.credentials.Signing` instance.
734 The new instance will use the same signer as the existing instance and
735 will use the existing instance's signer email as the issuer and
736 subject by default.
738 Example::
740 svc_creds = service_account.Credentials.from_service_account_file(
741 'service_account.json')
742 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials(
743 svc_creds)
745 Args:
746 credentials (google.auth.credentials.Signing): The credentials to
747 use to construct the new credentials.
748 kwargs: Additional arguments to pass to the constructor.
750 Returns:
751 google.auth.jwt.Credentials: A new Credentials instance.
752 """
753 kwargs.setdefault("issuer", credentials.signer_email)
754 kwargs.setdefault("subject", credentials.signer_email)
755 return cls(credentials.signer, **kwargs)
757 def with_claims(self, issuer=None, subject=None, additional_claims=None):
758 """Returns a copy of these credentials with modified claims.
760 Args:
761 issuer (str): The `iss` claim. If unspecified the current issuer
762 claim will be used.
763 subject (str): The `sub` claim. If unspecified the current subject
764 claim will be used.
765 additional_claims (Mapping[str, str]): Any additional claims for
766 the JWT payload. This will be merged with the current
767 additional claims.
769 Returns:
770 google.auth.jwt.OnDemandCredentials: A new credentials instance.
771 """
772 new_additional_claims = copy.deepcopy(self._additional_claims)
773 new_additional_claims.update(additional_claims or {})
775 return self.__class__(
776 self._signer,
777 issuer=issuer if issuer is not None else self._issuer,
778 subject=subject if subject is not None else self._subject,
779 additional_claims=new_additional_claims,
780 max_cache_size=self._cache.maxsize,
781 quota_project_id=self._quota_project_id,
782 )
784 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject)
785 def with_quota_project(self, quota_project_id):
786 return self.__class__(
787 self._signer,
788 issuer=self._issuer,
789 subject=self._subject,
790 additional_claims=self._additional_claims,
791 max_cache_size=self._cache.maxsize,
792 quota_project_id=quota_project_id,
793 )
795 @property
796 def valid(self):
797 """Checks the validity of the credentials.
799 These credentials are always valid because it generates tokens on
800 demand.
801 """
802 return True
804 def _make_jwt_for_audience(self, audience):
805 """Make a new JWT for the given audience.
807 Args:
808 audience (str): The intended audience.
810 Returns:
811 Tuple[bytes, datetime]: The encoded JWT and the expiration.
812 """
813 now = _helpers.utcnow()
814 lifetime = datetime.timedelta(seconds=self._token_lifetime)
815 expiry = now + lifetime
817 payload = {
818 "iss": self._issuer,
819 "sub": self._subject,
820 "iat": _helpers.datetime_to_secs(now),
821 "exp": _helpers.datetime_to_secs(expiry),
822 "aud": audience,
823 }
825 payload.update(self._additional_claims)
827 jwt = encode(self._signer, payload)
829 return jwt, expiry
831 def _get_jwt_for_audience(self, audience):
832 """Get a JWT For a given audience.
834 If there is already an existing, non-expired token in the cache for
835 the audience, that token is used. Otherwise, a new token will be
836 created.
838 Args:
839 audience (str): The intended audience.
841 Returns:
842 bytes: The encoded JWT.
843 """
844 token, expiry = self._cache.get(audience, (None, None))
846 if token is None or expiry < _helpers.utcnow():
847 token, expiry = self._make_jwt_for_audience(audience)
848 self._cache[audience] = token, expiry
850 return token
852 def refresh(self, request):
853 """Raises an exception, these credentials can not be directly
854 refreshed.
856 Args:
857 request (Any): Unused.
859 Raises:
860 google.auth.RefreshError
861 """
862 # pylint: disable=unused-argument
863 # (pylint doesn't correctly recognize overridden methods.)
864 raise exceptions.RefreshError(
865 "OnDemandCredentials can not be directly refreshed."
866 )
868 def before_request(self, request, method, url, headers):
869 """Performs credential-specific before request logic.
871 Args:
872 request (Any): Unused. JWT credentials do not need to make an
873 HTTP request to refresh.
874 method (str): The request's HTTP method.
875 url (str): The request's URI. This is used as the audience claim
876 when generating the JWT.
877 headers (Mapping): The request's headers.
878 """
879 # pylint: disable=unused-argument
880 # (pylint doesn't correctly recognize overridden methods.)
881 parts = urllib.parse.urlsplit(url)
882 # Strip query string and fragment
883 audience = urllib.parse.urlunsplit(
884 (parts.scheme, parts.netloc, parts.path, "", "")
885 )
886 token = self._get_jwt_for_audience(audience)
887 self.apply(headers, token=token)
889 @_helpers.copy_docstring(google.auth.credentials.Signing)
890 def sign_bytes(self, message):
891 return self._signer.sign(message)
893 @property # type: ignore
894 @_helpers.copy_docstring(google.auth.credentials.Signing)
895 def signer_email(self):
896 return self._issuer
898 @property # type: ignore
899 @_helpers.copy_docstring(google.auth.credentials.Signing)
900 def signer(self):
901 return self._signer