Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 63%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import os
5import warnings
6from calendar import timegm
7from collections.abc import Container, Iterable, Sequence
8from datetime import datetime, timedelta, timezone
9from typing import TYPE_CHECKING, Any, Union, cast
11from .api_jws import PyJWS, _jws_global_obj
12from .exceptions import (
13 DecodeError,
14 ExpiredSignatureError,
15 ImmatureSignatureError,
16 InvalidAudienceError,
17 InvalidIssuedAtError,
18 InvalidIssuerError,
19 InvalidJTIError,
20 InvalidSubjectError,
21 MissingRequiredClaimError,
22)
23from .warnings import RemovedInPyjwt3Warning
25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")):
26 import sys
28 if sys.version_info >= (3, 10):
29 from typing import TypeAlias
30 else:
31 # Python 3.9 and lower
32 from typing_extensions import TypeAlias
34 from .algorithms import has_crypto
35 from .api_jwk import PyJWK
36 from .types import FullOptions, Options, SigOptions
38 if has_crypto:
39 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
41 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes]
42 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes]
43 else:
44 AllowedPrivateKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore
45 AllowedPublicKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore
48class PyJWT:
49 def __init__(self, options: Options | None = None) -> None:
50 self.options: FullOptions
51 self.options = self._get_default_options()
52 if options is not None:
53 self.options = self._merge_options(options)
55 self._jws = PyJWS(options=self._get_sig_options())
57 @staticmethod
58 def _get_default_options() -> FullOptions:
59 return {
60 "verify_signature": True,
61 "verify_exp": True,
62 "verify_nbf": True,
63 "verify_iat": True,
64 "verify_aud": True,
65 "verify_iss": True,
66 "verify_sub": True,
67 "verify_jti": True,
68 "require": [],
69 "strict_aud": False,
70 "enforce_minimum_key_length": False,
71 }
73 def _get_sig_options(self) -> SigOptions:
74 return {
75 "verify_signature": self.options["verify_signature"],
76 "enforce_minimum_key_length": self.options.get(
77 "enforce_minimum_key_length", False
78 ),
79 }
81 def _merge_options(self, options: Options | None = None) -> FullOptions:
82 if options is None:
83 return self.options
85 # (defensive) set defaults for verify_x to False if verify_signature is False
86 if not options.get("verify_signature", True):
87 options["verify_exp"] = options.get("verify_exp", False)
88 options["verify_nbf"] = options.get("verify_nbf", False)
89 options["verify_iat"] = options.get("verify_iat", False)
90 options["verify_aud"] = options.get("verify_aud", False)
91 options["verify_iss"] = options.get("verify_iss", False)
92 options["verify_sub"] = options.get("verify_sub", False)
93 options["verify_jti"] = options.get("verify_jti", False)
94 return {**self.options, **options}
96 def encode(
97 self,
98 payload: dict[str, Any],
99 key: AllowedPrivateKeyTypes,
100 algorithm: str | None = "HS256",
101 headers: dict[str, Any] | None = None,
102 json_encoder: type[json.JSONEncoder] | None = None,
103 sort_headers: bool = True,
104 ) -> str:
105 """Encode the ``payload`` as JSON Web Token.
107 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
108 :type payload: dict[str, typing.Any]
109 :param key: a key suitable for the chosen algorithm:
111 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
112 * for **symmetric algorithms**: plain string, sufficiently long for security
114 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
115 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
116 If ``headers`` includes ``alg``, it will be preferred to this parameter.
117 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
118 :type algorithm: str or None
119 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
120 :type headers: dict[str, typing.Any] or None
121 :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
122 :type json_encoder: json.JSONEncoder or None
124 :rtype: str
125 :returns: a JSON Web Token
127 :raises TypeError: if ``payload`` is not a ``dict``
128 """
129 # Check that we get a dict
130 if not isinstance(payload, dict):
131 raise TypeError(
132 "Expecting a dict object, as JWT only supports "
133 "JSON objects as payloads."
134 )
136 # Payload
137 payload = payload.copy()
138 for time_claim in ["exp", "iat", "nbf"]:
139 # Convert datetime to a intDate value in known time-format claims
140 if isinstance(payload.get(time_claim), datetime):
141 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
143 # Issue #1039, iss being set to non-string
144 if "iss" in payload and not isinstance(payload["iss"], str):
145 raise TypeError("Issuer (iss) must be a string.")
147 json_payload = self._encode_payload(
148 payload,
149 headers=headers,
150 json_encoder=json_encoder,
151 )
153 return self._jws.encode(
154 json_payload,
155 key,
156 algorithm,
157 headers,
158 json_encoder,
159 sort_headers=sort_headers,
160 )
162 def _encode_payload(
163 self,
164 payload: dict[str, Any],
165 headers: dict[str, Any] | None = None,
166 json_encoder: type[json.JSONEncoder] | None = None,
167 ) -> bytes:
168 """
169 Encode a given payload to the bytes to be signed.
171 This method is intended to be overridden by subclasses that need to
172 encode the payload in a different way, e.g. compress the payload.
173 """
174 return json.dumps(
175 payload,
176 separators=(",", ":"),
177 cls=json_encoder,
178 ).encode("utf-8")
180 def decode_complete(
181 self,
182 jwt: str | bytes,
183 key: AllowedPublicKeyTypes = "",
184 algorithms: Sequence[str] | None = None,
185 options: Options | None = None,
186 # deprecated arg, remove in pyjwt3
187 verify: bool | None = None,
188 # could be used as passthrough to api_jws, consider removal in pyjwt3
189 detached_payload: bytes | None = None,
190 # passthrough arguments to _validate_claims
191 # consider putting in options
192 audience: str | Iterable[str] | None = None,
193 issuer: str | Container[str] | None = None,
194 subject: str | None = None,
195 leeway: float | timedelta = 0,
196 # kwargs
197 **kwargs: Any,
198 ) -> dict[str, Any]:
199 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
200 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
201 and "signature" respectively.
203 :param jwt: the token to be decoded
204 :type jwt: str or bytes
205 :param key: the key suitable for the allowed algorithm
206 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
208 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
210 .. warning::
212 Do **not** compute the ``algorithms`` parameter based on
213 the ``alg`` from the token itself, or on any other data
214 that an attacker may be able to influence, as that might
215 expose you to various vulnerabilities (see `RFC 8725 §2.1
216 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
217 either hard-code a fixed value for ``algorithms``, or
218 configure it in the same place you configure the
219 ``key``. Make sure not to mix symmetric and asymmetric
220 algorithms that interpret the ``key`` in different ways
221 (e.g. HS\\* and RS\\*).
222 :type algorithms: typing.Sequence[str] or None
224 :param jwt.types.Options options: extended decoding and validation options
225 Refer to :py:class:`jwt.types.Options` for more information.
227 :param audience: optional, the value for ``verify_aud`` check
228 :type audience: str or typing.Iterable[str] or None
229 :param issuer: optional, the value for ``verify_iss`` check
230 :type issuer: str or typing.Container[str] or None
231 :param leeway: a time margin in seconds for the expiration check
232 :type leeway: float or datetime.timedelta
233 :rtype: dict[str, typing.Any]
234 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
235 Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
236 """
237 if kwargs:
238 warnings.warn(
239 "passing additional kwargs to decode_complete() is deprecated "
240 "and will be removed in pyjwt version 3. "
241 f"Unsupported kwargs: {tuple(kwargs.keys())}",
242 RemovedInPyjwt3Warning,
243 stacklevel=2,
244 )
246 if options is None:
247 verify_signature = True
248 else:
249 verify_signature = options.get("verify_signature", True)
251 # If the user has set the legacy `verify` argument, and it doesn't match
252 # what the relevant `options` entry for the argument is, inform the user
253 # that they're likely making a mistake.
254 if verify is not None and verify != verify_signature:
255 warnings.warn(
256 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
257 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
258 "This invocation has a mismatch between the kwarg and the option entry.",
259 category=DeprecationWarning,
260 stacklevel=2,
261 )
263 merged_options = self._merge_options(options)
265 sig_options: SigOptions = {
266 "verify_signature": verify_signature,
267 }
268 decoded = self._jws.decode_complete(
269 jwt,
270 key=key,
271 algorithms=algorithms,
272 options=sig_options,
273 detached_payload=detached_payload,
274 )
276 payload = self._decode_payload(decoded)
278 self._validate_claims(
279 payload,
280 merged_options,
281 audience=audience,
282 issuer=issuer,
283 leeway=leeway,
284 subject=subject,
285 )
287 decoded["payload"] = payload
288 return decoded
290 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
291 """
292 Decode the payload from a JWS dictionary (payload, signature, header).
294 This method is intended to be overridden by subclasses that need to
295 decode the payload in a different way, e.g. decompress compressed
296 payloads.
297 """
298 try:
299 payload: dict[str, Any] = json.loads(decoded["payload"])
300 except ValueError as e:
301 raise DecodeError(f"Invalid payload string: {e}") from e
302 if not isinstance(payload, dict):
303 raise DecodeError("Invalid payload string: must be a json object")
304 return payload
306 def decode(
307 self,
308 jwt: str | bytes,
309 key: AllowedPublicKeys | PyJWK | str | bytes = "",
310 algorithms: Sequence[str] | None = None,
311 options: Options | None = None,
312 # deprecated arg, remove in pyjwt3
313 verify: bool | None = None,
314 # could be used as passthrough to api_jws, consider removal in pyjwt3
315 detached_payload: bytes | None = None,
316 # passthrough arguments to _validate_claims
317 # consider putting in options
318 audience: str | Iterable[str] | None = None,
319 subject: str | None = None,
320 issuer: str | Container[str] | None = None,
321 leeway: float | timedelta = 0,
322 # kwargs
323 **kwargs: Any,
324 ) -> dict[str, Any]:
325 """Verify the ``jwt`` token signature and return the token claims.
327 :param jwt: the token to be decoded
328 :type jwt: str or bytes
329 :param key: the key suitable for the allowed algorithm
330 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
332 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
333 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.
335 .. warning::
337 Do **not** compute the ``algorithms`` parameter based on
338 the ``alg`` from the token itself, or on any other data
339 that an attacker may be able to influence, as that might
340 expose you to various vulnerabilities (see `RFC 8725 §2.1
341 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
342 either hard-code a fixed value for ``algorithms``, or
343 configure it in the same place you configure the
344 ``key``. Make sure not to mix symmetric and asymmetric
345 algorithms that interpret the ``key`` in different ways
346 (e.g. HS\\* and RS\\*).
347 :type algorithms: typing.Sequence[str] or None
349 :param jwt.types.Options options: extended decoding and validation options
350 Refer to :py:class:`jwt.types.Options` for more information.
352 :param audience: optional, the value for ``verify_aud`` check
353 :type audience: str or typing.Iterable[str] or None
354 :param subject: optional, the value for ``verify_sub`` check
355 :type subject: str or None
356 :param issuer: optional, the value for ``verify_iss`` check
357 :type issuer: str or typing.Container[str] or None
358 :param leeway: a time margin in seconds for the expiration check
359 :type leeway: float or datetime.timedelta
360 :rtype: dict[str, typing.Any]
361 :returns: the JWT claims
362 """
363 if kwargs:
364 warnings.warn(
365 "passing additional kwargs to decode() is deprecated "
366 "and will be removed in pyjwt version 3. "
367 f"Unsupported kwargs: {tuple(kwargs.keys())}",
368 RemovedInPyjwt3Warning,
369 stacklevel=2,
370 )
371 decoded = self.decode_complete(
372 jwt,
373 key,
374 algorithms,
375 options,
376 verify=verify,
377 detached_payload=detached_payload,
378 audience=audience,
379 subject=subject,
380 issuer=issuer,
381 leeway=leeway,
382 )
383 return cast(dict[str, Any], decoded["payload"])
385 def _validate_claims(
386 self,
387 payload: dict[str, Any],
388 options: FullOptions,
389 audience: Iterable[str] | str | None = None,
390 issuer: Container[str] | str | None = None,
391 subject: str | None = None,
392 leeway: float | timedelta = 0,
393 ) -> None:
394 if isinstance(leeway, timedelta):
395 leeway = leeway.total_seconds()
397 if audience is not None and not isinstance(audience, (str, Iterable)):
398 raise TypeError("audience must be a string, iterable or None")
400 self._validate_required_claims(payload, options["require"])
402 now = datetime.now(tz=timezone.utc).timestamp()
404 if "iat" in payload and options["verify_iat"]:
405 self._validate_iat(payload, now, leeway)
407 if "nbf" in payload and options["verify_nbf"]:
408 self._validate_nbf(payload, now, leeway)
410 if "exp" in payload and options["verify_exp"]:
411 self._validate_exp(payload, now, leeway)
413 if options["verify_iss"]:
414 self._validate_iss(payload, issuer)
416 if options["verify_aud"]:
417 self._validate_aud(
418 payload, audience, strict=options.get("strict_aud", False)
419 )
421 if options["verify_sub"]:
422 self._validate_sub(payload, subject)
424 if options["verify_jti"]:
425 self._validate_jti(payload)
427 def _validate_required_claims(
428 self,
429 payload: dict[str, Any],
430 claims: Iterable[str],
431 ) -> None:
432 for claim in claims:
433 if payload.get(claim) is None:
434 raise MissingRequiredClaimError(claim)
436 def _validate_sub(
437 self, payload: dict[str, Any], subject: str | None = None
438 ) -> None:
439 """
440 Checks whether "sub" if in the payload is valid or not.
441 This is an Optional claim
443 :param payload(dict): The payload which needs to be validated
444 :param subject(str): The subject of the token
445 """
447 if "sub" not in payload:
448 return
450 if not isinstance(payload["sub"], str):
451 raise InvalidSubjectError("Subject must be a string")
453 if subject is not None:
454 if payload.get("sub") != subject:
455 raise InvalidSubjectError("Invalid subject")
457 def _validate_jti(self, payload: dict[str, Any]) -> None:
458 """
459 Checks whether "jti" if in the payload is valid or not
460 This is an Optional claim
462 :param payload(dict): The payload which needs to be validated
463 """
465 if "jti" not in payload:
466 return
468 if not isinstance(payload.get("jti"), str):
469 raise InvalidJTIError("JWT ID must be a string")
471 def _validate_iat(
472 self,
473 payload: dict[str, Any],
474 now: float,
475 leeway: float,
476 ) -> None:
477 try:
478 iat = int(payload["iat"])
479 except ValueError:
480 raise InvalidIssuedAtError(
481 "Issued At claim (iat) must be an integer."
482 ) from None
483 if iat > (now + leeway):
484 raise ImmatureSignatureError("The token is not yet valid (iat)")
486 def _validate_nbf(
487 self,
488 payload: dict[str, Any],
489 now: float,
490 leeway: float,
491 ) -> None:
492 try:
493 nbf = int(payload["nbf"])
494 except ValueError:
495 raise DecodeError("Not Before claim (nbf) must be an integer.") from None
497 if nbf > (now + leeway):
498 raise ImmatureSignatureError("The token is not yet valid (nbf)")
500 def _validate_exp(
501 self,
502 payload: dict[str, Any],
503 now: float,
504 leeway: float,
505 ) -> None:
506 try:
507 exp = int(payload["exp"])
508 except ValueError:
509 raise DecodeError(
510 "Expiration Time claim (exp) must be an integer."
511 ) from None
513 if exp <= (now - leeway):
514 raise ExpiredSignatureError("Signature has expired")
516 def _validate_aud(
517 self,
518 payload: dict[str, Any],
519 audience: str | Iterable[str] | None,
520 *,
521 strict: bool = False,
522 ) -> None:
523 if audience is None:
524 if "aud" not in payload or not payload["aud"]:
525 return
526 # Application did not specify an audience, but
527 # the token has the 'aud' claim
528 raise InvalidAudienceError("Invalid audience")
530 if "aud" not in payload or not payload["aud"]:
531 # Application specified an audience, but it could not be
532 # verified since the token does not contain a claim.
533 raise MissingRequiredClaimError("aud")
535 audience_claims = payload["aud"]
537 # In strict mode, we forbid list matching: the supplied audience
538 # must be a string, and it must exactly match the audience claim.
539 if strict:
540 # Only a single audience is allowed in strict mode.
541 if not isinstance(audience, str):
542 raise InvalidAudienceError("Invalid audience (strict)")
544 # Only a single audience claim is allowed in strict mode.
545 if not isinstance(audience_claims, str):
546 raise InvalidAudienceError("Invalid claim format in token (strict)")
548 if audience != audience_claims:
549 raise InvalidAudienceError("Audience doesn't match (strict)")
551 return
553 if isinstance(audience_claims, str):
554 audience_claims = [audience_claims]
555 if not isinstance(audience_claims, list):
556 raise InvalidAudienceError("Invalid claim format in token")
557 if any(not isinstance(c, str) for c in audience_claims):
558 raise InvalidAudienceError("Invalid claim format in token")
560 if isinstance(audience, str):
561 audience = [audience]
563 if all(aud not in audience_claims for aud in audience):
564 raise InvalidAudienceError("Audience doesn't match")
566 def _validate_iss(
567 self, payload: dict[str, Any], issuer: Container[str] | str | None
568 ) -> None:
569 if issuer is None:
570 return
572 if "iss" not in payload:
573 raise MissingRequiredClaimError("iss")
575 iss = payload["iss"]
576 if not isinstance(iss, str):
577 raise InvalidIssuerError("Payload Issuer (iss) must be a string")
579 if isinstance(issuer, str):
580 if iss != issuer:
581 raise InvalidIssuerError("Invalid issuer")
582 else:
583 try:
584 if iss not in issuer:
585 raise InvalidIssuerError("Invalid issuer")
586 except TypeError:
587 raise InvalidIssuerError(
588 'Issuer param must be "str" or "Container[str]"'
589 ) from None
592_jwt_global_obj = PyJWT()
593_jwt_global_obj._jws = _jws_global_obj
594encode = _jwt_global_obj.encode
595decode_complete = _jwt_global_obj.decode_complete
596decode = _jwt_global_obj.decode