Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import os
5import warnings
6from calendar import timegm
7from collections.abc import Container, Iterable, Sequence
8from datetime import datetime, timedelta, timezone
9from typing import TYPE_CHECKING, Any, Union, cast
11from .api_jws import PyJWS, _ALGORITHM_UNSET, _jws_global_obj
12from .exceptions import (
13 DecodeError,
14 ExpiredSignatureError,
15 ImmatureSignatureError,
16 InvalidAudienceError,
17 InvalidIssuedAtError,
18 InvalidIssuerError,
19 InvalidJTIError,
20 InvalidSubjectError,
21 MissingRequiredClaimError,
22)
23from .warnings import RemovedInPyjwt3Warning
25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")):
26 import sys
28 if sys.version_info >= (3, 10):
29 from typing import TypeAlias
30 else:
31 # Python 3.9 and lower
32 from typing_extensions import TypeAlias
34 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
35 from .api_jwk import PyJWK
36 from .types import FullOptions, Options, SigOptions
38 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes]
39 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes]
42class PyJWT:
43 def __init__(self, options: Options | None = None) -> None:
44 self.options: FullOptions
45 self.options = self._get_default_options()
46 if options is not None:
47 self.options = self._merge_options(options)
49 self._jws = PyJWS(options=self._get_sig_options())
51 @staticmethod
52 def _get_default_options() -> FullOptions:
53 return {
54 "verify_signature": True,
55 "verify_exp": True,
56 "verify_nbf": True,
57 "verify_iat": True,
58 "verify_aud": True,
59 "verify_iss": True,
60 "verify_sub": True,
61 "verify_jti": True,
62 "require": [],
63 "strict_aud": False,
64 "enforce_minimum_key_length": False,
65 }
67 def _get_sig_options(self) -> SigOptions:
68 return {
69 "verify_signature": self.options["verify_signature"],
70 "enforce_minimum_key_length": self.options.get(
71 "enforce_minimum_key_length", False
72 ),
73 }
75 def _merge_options(self, options: Options | None = None) -> FullOptions:
76 if options is None:
77 return self.options
79 # (defensive) set defaults for verify_x to False if verify_signature is False
80 if not options.get("verify_signature", True):
81 options["verify_exp"] = options.get("verify_exp", False)
82 options["verify_nbf"] = options.get("verify_nbf", False)
83 options["verify_iat"] = options.get("verify_iat", False)
84 options["verify_aud"] = options.get("verify_aud", False)
85 options["verify_iss"] = options.get("verify_iss", False)
86 options["verify_sub"] = options.get("verify_sub", False)
87 options["verify_jti"] = options.get("verify_jti", False)
88 return {**self.options, **options}
90 def encode(
91 self,
92 payload: dict[str, Any],
93 key: AllowedPrivateKeyTypes,
94 algorithm: str | None = _ALGORITHM_UNSET, # type: ignore[assignment]
95 headers: dict[str, Any] | None = None,
96 json_encoder: type[json.JSONEncoder] | None = None,
97 sort_headers: bool = True,
98 ) -> str:
99 """Encode the ``payload`` as JSON Web Token.
101 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
102 :type payload: dict[str, typing.Any]
103 :param key: a key suitable for the chosen algorithm:
105 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
106 * for **symmetric algorithms**: plain string, sufficiently long for security
108 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
109 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
110 If ``headers`` includes ``alg``, it will be preferred to this parameter.
111 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
112 :type algorithm: str or None
113 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
114 :type headers: dict[str, typing.Any] or None
115 :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
116 :type json_encoder: json.JSONEncoder or None
118 :rtype: str
119 :returns: a JSON Web Token
121 :raises TypeError: if ``payload`` is not a ``dict``
122 """
123 # Check that we get a dict
124 if not isinstance(payload, dict):
125 raise TypeError(
126 "Expecting a dict object, as JWT only supports "
127 "JSON objects as payloads."
128 )
130 # Payload
131 payload = payload.copy()
132 for time_claim in ["exp", "iat", "nbf"]:
133 # Convert datetime to a intDate value in known time-format claims
134 if isinstance(payload.get(time_claim), datetime):
135 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
137 # Issue #1039, iss being set to non-string
138 if "iss" in payload and not isinstance(payload["iss"], str):
139 raise TypeError("Issuer (iss) must be a string.")
141 json_payload = self._encode_payload(
142 payload,
143 headers=headers,
144 json_encoder=json_encoder,
145 )
147 return self._jws.encode(
148 json_payload,
149 key,
150 algorithm,
151 headers,
152 json_encoder,
153 sort_headers=sort_headers,
154 )
156 def _encode_payload(
157 self,
158 payload: dict[str, Any],
159 headers: dict[str, Any] | None = None,
160 json_encoder: type[json.JSONEncoder] | None = None,
161 ) -> bytes:
162 """
163 Encode a given payload to the bytes to be signed.
165 This method is intended to be overridden by subclasses that need to
166 encode the payload in a different way, e.g. compress the payload.
167 """
168 return json.dumps(
169 payload,
170 separators=(",", ":"),
171 cls=json_encoder,
172 ).encode("utf-8")
174 def decode_complete(
175 self,
176 jwt: str | bytes,
177 key: AllowedPublicKeyTypes = "",
178 algorithms: Sequence[str] | None = None,
179 options: Options | None = None,
180 # deprecated arg, remove in pyjwt3
181 verify: bool | None = None,
182 # could be used as passthrough to api_jws, consider removal in pyjwt3
183 detached_payload: bytes | None = None,
184 # passthrough arguments to _validate_claims
185 # consider putting in options
186 audience: str | Iterable[str] | None = None,
187 issuer: str | Container[str] | None = None,
188 subject: str | None = None,
189 leeway: float | timedelta = 0,
190 # kwargs
191 **kwargs: Any,
192 ) -> dict[str, Any]:
193 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
194 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
195 and "signature" respectively.
197 :param jwt: the token to be decoded
198 :type jwt: str or bytes
199 :param key: the key suitable for the allowed algorithm
200 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
202 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
204 .. warning::
206 Do **not** compute the ``algorithms`` parameter based on
207 the ``alg`` from the token itself, or on any other data
208 that an attacker may be able to influence, as that might
209 expose you to various vulnerabilities (see `RFC 8725 §2.1
210 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
211 either hard-code a fixed value for ``algorithms``, or
212 configure it in the same place you configure the
213 ``key``. Make sure not to mix symmetric and asymmetric
214 algorithms that interpret the ``key`` in different ways
215 (e.g. HS\\* and RS\\*).
216 :type algorithms: typing.Sequence[str] or None
218 :param jwt.types.Options options: extended decoding and validation options
219 Refer to :py:class:`jwt.types.Options` for more information.
221 :param audience: optional, the value for ``verify_aud`` check
222 :type audience: str or typing.Iterable[str] or None
223 :param issuer: optional, the value for ``verify_iss`` check
224 :type issuer: str or typing.Container[str] or None
225 :param leeway: a time margin in seconds for the expiration check
226 :type leeway: float or datetime.timedelta
227 :rtype: dict[str, typing.Any]
228 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
229 Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
230 """
231 if kwargs:
232 warnings.warn(
233 "passing additional kwargs to decode_complete() is deprecated "
234 "and will be removed in pyjwt version 3. "
235 f"Unsupported kwargs: {tuple(kwargs.keys())}",
236 RemovedInPyjwt3Warning,
237 stacklevel=2,
238 )
240 if options is None:
241 verify_signature = True
242 else:
243 verify_signature = options.get("verify_signature", True)
245 # If the user has set the legacy `verify` argument, and it doesn't match
246 # what the relevant `options` entry for the argument is, inform the user
247 # that they're likely making a mistake.
248 if verify is not None and verify != verify_signature:
249 warnings.warn(
250 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
251 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
252 "This invocation has a mismatch between the kwarg and the option entry.",
253 category=DeprecationWarning,
254 stacklevel=2,
255 )
257 merged_options = self._merge_options(options)
259 sig_options: SigOptions = {
260 "verify_signature": verify_signature,
261 "enforce_minimum_key_length": merged_options.get(
262 "enforce_minimum_key_length", False
263 ),
264 }
265 decoded = self._jws.decode_complete(
266 jwt,
267 key=key,
268 algorithms=algorithms,
269 options=sig_options,
270 detached_payload=detached_payload,
271 )
273 payload = self._decode_payload(decoded)
275 self._validate_claims(
276 payload,
277 merged_options,
278 audience=audience,
279 issuer=issuer,
280 leeway=leeway,
281 subject=subject,
282 )
284 decoded["payload"] = payload
285 return decoded
287 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
288 """
289 Decode the payload from a JWS dictionary (payload, signature, header).
291 This method is intended to be overridden by subclasses that need to
292 decode the payload in a different way, e.g. decompress compressed
293 payloads.
294 """
295 try:
296 payload: dict[str, Any] = json.loads(decoded["payload"])
297 except ValueError as e:
298 raise DecodeError(f"Invalid payload string: {e}") from e
299 if not isinstance(payload, dict):
300 raise DecodeError("Invalid payload string: must be a json object")
301 return payload
303 def decode(
304 self,
305 jwt: str | bytes,
306 key: AllowedPublicKeys | PyJWK | str | bytes = "",
307 algorithms: Sequence[str] | None = None,
308 options: Options | None = None,
309 # deprecated arg, remove in pyjwt3
310 verify: bool | None = None,
311 # could be used as passthrough to api_jws, consider removal in pyjwt3
312 detached_payload: bytes | None = None,
313 # passthrough arguments to _validate_claims
314 # consider putting in options
315 audience: str | Iterable[str] | None = None,
316 subject: str | None = None,
317 issuer: str | Container[str] | None = None,
318 leeway: float | timedelta = 0,
319 # kwargs
320 **kwargs: Any,
321 ) -> dict[str, Any]:
322 """Verify the ``jwt`` token signature and return the token claims.
324 :param jwt: the token to be decoded
325 :type jwt: str or bytes
326 :param key: the key suitable for the allowed algorithm
327 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
329 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
330 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.
332 .. warning::
334 Do **not** compute the ``algorithms`` parameter based on
335 the ``alg`` from the token itself, or on any other data
336 that an attacker may be able to influence, as that might
337 expose you to various vulnerabilities (see `RFC 8725 §2.1
338 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
339 either hard-code a fixed value for ``algorithms``, or
340 configure it in the same place you configure the
341 ``key``. Make sure not to mix symmetric and asymmetric
342 algorithms that interpret the ``key`` in different ways
343 (e.g. HS\\* and RS\\*).
344 :type algorithms: typing.Sequence[str] or None
346 :param jwt.types.Options options: extended decoding and validation options
347 Refer to :py:class:`jwt.types.Options` for more information.
349 :param audience: optional, the value for ``verify_aud`` check
350 :type audience: str or typing.Iterable[str] or None
351 :param subject: optional, the value for ``verify_sub`` check
352 :type subject: str or None
353 :param issuer: optional, the value for ``verify_iss`` check
354 :type issuer: str or typing.Container[str] or None
355 :param leeway: a time margin in seconds for the expiration check
356 :type leeway: float or datetime.timedelta
357 :rtype: dict[str, typing.Any]
358 :returns: the JWT claims
359 """
360 if kwargs:
361 warnings.warn(
362 "passing additional kwargs to decode() is deprecated "
363 "and will be removed in pyjwt version 3. "
364 f"Unsupported kwargs: {tuple(kwargs.keys())}",
365 RemovedInPyjwt3Warning,
366 stacklevel=2,
367 )
368 decoded = self.decode_complete(
369 jwt,
370 key,
371 algorithms,
372 options,
373 verify=verify,
374 detached_payload=detached_payload,
375 audience=audience,
376 subject=subject,
377 issuer=issuer,
378 leeway=leeway,
379 )
380 return cast(dict[str, Any], decoded["payload"])
382 def _validate_claims(
383 self,
384 payload: dict[str, Any],
385 options: FullOptions,
386 audience: Iterable[str] | str | None = None,
387 issuer: Container[str] | str | None = None,
388 subject: str | None = None,
389 leeway: float | timedelta = 0,
390 ) -> None:
391 if isinstance(leeway, timedelta):
392 leeway = leeway.total_seconds()
394 if audience is not None and not isinstance(audience, (str, Iterable)):
395 raise TypeError("audience must be a string, iterable or None")
397 self._validate_required_claims(payload, options["require"])
399 now = datetime.now(tz=timezone.utc).timestamp()
401 if "iat" in payload and options["verify_iat"]:
402 self._validate_iat(payload, now, leeway)
404 if "nbf" in payload and options["verify_nbf"]:
405 self._validate_nbf(payload, now, leeway)
407 if "exp" in payload and options["verify_exp"]:
408 self._validate_exp(payload, now, leeway)
410 if options["verify_iss"]:
411 self._validate_iss(payload, issuer)
413 if options["verify_aud"]:
414 self._validate_aud(
415 payload, audience, strict=options.get("strict_aud", False)
416 )
418 if options["verify_sub"]:
419 self._validate_sub(payload, subject)
421 if options["verify_jti"]:
422 self._validate_jti(payload)
424 def _validate_required_claims(
425 self,
426 payload: dict[str, Any],
427 claims: Iterable[str],
428 ) -> None:
429 for claim in claims:
430 if payload.get(claim) is None:
431 raise MissingRequiredClaimError(claim)
433 def _validate_sub(
434 self, payload: dict[str, Any], subject: str | None = None
435 ) -> None:
436 """
437 Checks whether "sub" if in the payload is valid or not.
438 This is an Optional claim
440 :param payload(dict): The payload which needs to be validated
441 :param subject(str): The subject of the token
442 """
444 if "sub" not in payload:
445 return
447 if not isinstance(payload["sub"], str):
448 raise InvalidSubjectError("Subject must be a string")
450 if subject is not None:
451 if payload.get("sub") != subject:
452 raise InvalidSubjectError("Invalid subject")
454 def _validate_jti(self, payload: dict[str, Any]) -> None:
455 """
456 Checks whether "jti" if in the payload is valid or not
457 This is an Optional claim
459 :param payload(dict): The payload which needs to be validated
460 """
462 if "jti" not in payload:
463 return
465 if not isinstance(payload.get("jti"), str):
466 raise InvalidJTIError("JWT ID must be a string")
468 def _validate_iat(
469 self,
470 payload: dict[str, Any],
471 now: float,
472 leeway: float,
473 ) -> None:
474 try:
475 iat = int(payload["iat"])
476 except ValueError:
477 raise InvalidIssuedAtError(
478 "Issued At claim (iat) must be an integer."
479 ) from None
480 if iat > (now + leeway):
481 raise ImmatureSignatureError("The token is not yet valid (iat)")
483 def _validate_nbf(
484 self,
485 payload: dict[str, Any],
486 now: float,
487 leeway: float,
488 ) -> None:
489 try:
490 nbf = int(payload["nbf"])
491 except ValueError:
492 raise DecodeError("Not Before claim (nbf) must be an integer.") from None
494 if nbf > (now + leeway):
495 raise ImmatureSignatureError("The token is not yet valid (nbf)")
497 def _validate_exp(
498 self,
499 payload: dict[str, Any],
500 now: float,
501 leeway: float,
502 ) -> None:
503 try:
504 exp = int(payload["exp"])
505 except ValueError:
506 raise DecodeError(
507 "Expiration Time claim (exp) must be an integer."
508 ) from None
510 if exp <= (now - leeway):
511 raise ExpiredSignatureError("Signature has expired")
513 def _validate_aud(
514 self,
515 payload: dict[str, Any],
516 audience: str | Iterable[str] | None,
517 *,
518 strict: bool = False,
519 ) -> None:
520 if audience is None:
521 if "aud" not in payload or not payload["aud"]:
522 return
523 # Application did not specify an audience, but
524 # the token has the 'aud' claim
525 raise InvalidAudienceError("Invalid audience")
527 if "aud" not in payload or not payload["aud"]:
528 # Application specified an audience, but it could not be
529 # verified since the token does not contain a claim.
530 raise MissingRequiredClaimError("aud")
532 audience_claims = payload["aud"]
534 # In strict mode, we forbid list matching: the supplied audience
535 # must be a string, and it must exactly match the audience claim.
536 if strict:
537 # Only a single audience is allowed in strict mode.
538 if not isinstance(audience, str):
539 raise InvalidAudienceError("Invalid audience (strict)")
541 # Only a single audience claim is allowed in strict mode.
542 if not isinstance(audience_claims, str):
543 raise InvalidAudienceError("Invalid claim format in token (strict)")
545 if audience != audience_claims:
546 raise InvalidAudienceError("Audience doesn't match (strict)")
548 return
550 if isinstance(audience_claims, str):
551 audience_claims = [audience_claims]
552 if not isinstance(audience_claims, list):
553 raise InvalidAudienceError("Invalid claim format in token")
554 if any(not isinstance(c, str) for c in audience_claims):
555 raise InvalidAudienceError("Invalid claim format in token")
557 if isinstance(audience, str):
558 audience = [audience]
560 if all(aud not in audience_claims for aud in audience):
561 raise InvalidAudienceError("Audience doesn't match")
563 def _validate_iss(
564 self, payload: dict[str, Any], issuer: Container[str] | str | None
565 ) -> None:
566 if issuer is None:
567 return
569 if "iss" not in payload:
570 raise MissingRequiredClaimError("iss")
572 iss = payload["iss"]
573 if not isinstance(iss, str):
574 raise InvalidIssuerError("Payload Issuer (iss) must be a string")
576 if isinstance(issuer, str):
577 if iss != issuer:
578 raise InvalidIssuerError("Invalid issuer")
579 else:
580 try:
581 if iss not in issuer:
582 raise InvalidIssuerError("Invalid issuer")
583 except TypeError:
584 raise InvalidIssuerError(
585 'Issuer param must be "str" or "Container[str]"'
586 ) from None
589_jwt_global_obj = PyJWT()
590_jwt_global_obj._jws = _jws_global_obj
591encode = _jwt_global_obj.encode
592decode_complete = _jwt_global_obj.decode_complete
593decode = _jwt_global_obj.decode