Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import os
5import warnings
6from calendar import timegm
7from collections.abc import Container, Iterable, Sequence
8from datetime import datetime, timedelta, timezone
9from typing import TYPE_CHECKING, Any, Union, cast
11from .api_jws import PyJWS, _ALGORITHM_UNSET, _jws_global_obj
12from .exceptions import (
13 DecodeError,
14 ExpiredSignatureError,
15 ImmatureSignatureError,
16 InvalidAudienceError,
17 InvalidIssuedAtError,
18 InvalidIssuerError,
19 InvalidJTIError,
20 InvalidSubjectError,
21 MissingRequiredClaimError,
22)
23from .warnings import RemovedInPyjwt3Warning
25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")):
26 import sys
28 if sys.version_info >= (3, 10):
29 from typing import TypeAlias
30 else:
31 # Python 3.9 and lower
32 from typing_extensions import TypeAlias
34 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
35 from .api_jwk import PyJWK
36 from .types import FullOptions, Options, SigOptions
38 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes]
39 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes]
42class PyJWT:
43 def __init__(self, options: Options | None = None) -> None:
44 self.options: FullOptions
45 self.options = self._get_default_options()
46 if options is not None:
47 self.options = self._merge_options(options)
49 self._jws = PyJWS(options=self._get_sig_options())
51 @staticmethod
52 def _get_default_options() -> FullOptions:
53 return {
54 "verify_signature": True,
55 "verify_exp": True,
56 "verify_nbf": True,
57 "verify_iat": True,
58 "verify_aud": True,
59 "verify_iss": True,
60 "verify_sub": True,
61 "verify_jti": True,
62 "require": [],
63 "strict_aud": False,
64 "enforce_minimum_key_length": False,
65 }
67 def _get_sig_options(self) -> SigOptions:
68 return {
69 "verify_signature": self.options["verify_signature"],
70 "enforce_minimum_key_length": self.options.get(
71 "enforce_minimum_key_length", False
72 ),
73 }
75 def _merge_options(self, options: Options | None = None) -> FullOptions:
76 if options is None:
77 return self.options
79 # (defensive) set defaults for verify_x to False if verify_signature is False
80 if not options.get("verify_signature", True):
81 options["verify_exp"] = options.get("verify_exp", False)
82 options["verify_nbf"] = options.get("verify_nbf", False)
83 options["verify_iat"] = options.get("verify_iat", False)
84 options["verify_aud"] = options.get("verify_aud", False)
85 options["verify_iss"] = options.get("verify_iss", False)
86 options["verify_sub"] = options.get("verify_sub", False)
87 options["verify_jti"] = options.get("verify_jti", False)
88 return {**self.options, **options}
90 def encode(
91 self,
92 payload: dict[str, Any],
93 key: AllowedPrivateKeyTypes,
94 algorithm: str | None = _ALGORITHM_UNSET, # type: ignore[assignment]
95 headers: dict[str, Any] | None = None,
96 json_encoder: type[json.JSONEncoder] | None = None,
97 sort_headers: bool = True,
98 ) -> str:
99 """Encode the ``payload`` as JSON Web Token.
101 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
102 :type payload: dict[str, typing.Any]
103 :param key: a key suitable for the chosen algorithm:
105 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
106 * for **symmetric algorithms**: plain string, sufficiently long for security
108 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
109 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
110 If ``headers`` includes ``alg``, it will be preferred to this parameter.
111 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
112 :type algorithm: str or None
113 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
114 :type headers: dict[str, typing.Any] or None
115 :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
116 :type json_encoder: json.JSONEncoder or None
118 :rtype: str
119 :returns: a JSON Web Token
121 :raises TypeError: if ``payload`` is not a ``dict``
122 """
123 # Check that we get a dict
124 if not isinstance(payload, dict):
125 raise TypeError(
126 "Expecting a dict object, as JWT only supports "
127 "JSON objects as payloads."
128 )
130 # Payload
131 payload = payload.copy()
132 for time_claim in ["exp", "iat", "nbf"]:
133 # Convert datetime to a intDate value in known time-format claims
134 if isinstance(payload.get(time_claim), datetime):
135 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
137 # Issue #1039, iss being set to non-string
138 if "iss" in payload and not isinstance(payload["iss"], str):
139 raise TypeError("Issuer (iss) must be a string.")
141 json_payload = self._encode_payload(
142 payload,
143 headers=headers,
144 json_encoder=json_encoder,
145 )
147 return self._jws.encode(
148 json_payload,
149 key,
150 algorithm,
151 headers,
152 json_encoder,
153 sort_headers=sort_headers,
154 )
156 def _encode_payload(
157 self,
158 payload: dict[str, Any],
159 headers: dict[str, Any] | None = None,
160 json_encoder: type[json.JSONEncoder] | None = None,
161 ) -> bytes:
162 """
163 Encode a given payload to the bytes to be signed.
165 This method is intended to be overridden by subclasses that need to
166 encode the payload in a different way, e.g. compress the payload.
167 """
168 return json.dumps(
169 payload,
170 separators=(",", ":"),
171 cls=json_encoder,
172 ).encode("utf-8")
174 def decode_complete(
175 self,
176 jwt: str | bytes,
177 key: AllowedPublicKeyTypes = "",
178 algorithms: Sequence[str] | None = None,
179 options: Options | None = None,
180 # deprecated arg, remove in pyjwt3
181 verify: bool | None = None,
182 # could be used as passthrough to api_jws, consider removal in pyjwt3
183 detached_payload: bytes | None = None,
184 # passthrough arguments to _validate_claims
185 # consider putting in options
186 audience: str | Iterable[str] | None = None,
187 issuer: str | Container[str] | None = None,
188 subject: str | None = None,
189 leeway: float | timedelta = 0,
190 # kwargs
191 **kwargs: Any,
192 ) -> dict[str, Any]:
193 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
194 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
195 and "signature" respectively.
197 :param jwt: the token to be decoded
198 :type jwt: str or bytes
199 :param key: the key suitable for the allowed algorithm
200 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
202 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
204 .. warning::
206 Do **not** compute the ``algorithms`` parameter based on
207 the ``alg`` from the token itself, or on any other data
208 that an attacker may be able to influence, as that might
209 expose you to various vulnerabilities (see `RFC 8725 §2.1
210 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
211 either hard-code a fixed value for ``algorithms``, or
212 configure it in the same place you configure the
213 ``key``. Make sure not to mix symmetric and asymmetric
214 algorithms that interpret the ``key`` in different ways
215 (e.g. HS\\* and RS\\*).
216 :type algorithms: typing.Sequence[str] or None
218 :param jwt.types.Options options: extended decoding and validation options
219 Refer to :py:class:`jwt.types.Options` for more information.
221 :param audience: optional, the value for ``verify_aud`` check
222 :type audience: str or typing.Iterable[str] or None
223 :param issuer: optional, the value for ``verify_iss`` check
224 :type issuer: str or typing.Container[str] or None
225 :param leeway: a time margin in seconds for the expiration check
226 :type leeway: float or datetime.timedelta
227 :rtype: dict[str, typing.Any]
228 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
229 Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
230 """
231 if kwargs:
232 warnings.warn(
233 "passing additional kwargs to decode_complete() is deprecated "
234 "and will be removed in pyjwt version 3. "
235 f"Unsupported kwargs: {tuple(kwargs.keys())}",
236 RemovedInPyjwt3Warning,
237 stacklevel=2,
238 )
240 if options is None:
241 verify_signature = True
242 else:
243 verify_signature = options.get("verify_signature", True)
245 # If the user has set the legacy `verify` argument, and it doesn't match
246 # what the relevant `options` entry for the argument is, inform the user
247 # that they're likely making a mistake.
248 if verify is not None and verify != verify_signature:
249 warnings.warn(
250 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
251 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
252 "This invocation has a mismatch between the kwarg and the option entry.",
253 category=DeprecationWarning,
254 stacklevel=2,
255 )
257 merged_options = self._merge_options(options)
259 sig_options: SigOptions = {
260 "verify_signature": verify_signature,
261 }
262 decoded = self._jws.decode_complete(
263 jwt,
264 key=key,
265 algorithms=algorithms,
266 options=sig_options,
267 detached_payload=detached_payload,
268 )
270 payload = self._decode_payload(decoded)
272 self._validate_claims(
273 payload,
274 merged_options,
275 audience=audience,
276 issuer=issuer,
277 leeway=leeway,
278 subject=subject,
279 )
281 decoded["payload"] = payload
282 return decoded
284 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
285 """
286 Decode the payload from a JWS dictionary (payload, signature, header).
288 This method is intended to be overridden by subclasses that need to
289 decode the payload in a different way, e.g. decompress compressed
290 payloads.
291 """
292 try:
293 payload: dict[str, Any] = json.loads(decoded["payload"])
294 except ValueError as e:
295 raise DecodeError(f"Invalid payload string: {e}") from e
296 if not isinstance(payload, dict):
297 raise DecodeError("Invalid payload string: must be a json object")
298 return payload
300 def decode(
301 self,
302 jwt: str | bytes,
303 key: AllowedPublicKeys | PyJWK | str | bytes = "",
304 algorithms: Sequence[str] | None = None,
305 options: Options | None = None,
306 # deprecated arg, remove in pyjwt3
307 verify: bool | None = None,
308 # could be used as passthrough to api_jws, consider removal in pyjwt3
309 detached_payload: bytes | None = None,
310 # passthrough arguments to _validate_claims
311 # consider putting in options
312 audience: str | Iterable[str] | None = None,
313 subject: str | None = None,
314 issuer: str | Container[str] | None = None,
315 leeway: float | timedelta = 0,
316 # kwargs
317 **kwargs: Any,
318 ) -> dict[str, Any]:
319 """Verify the ``jwt`` token signature and return the token claims.
321 :param jwt: the token to be decoded
322 :type jwt: str or bytes
323 :param key: the key suitable for the allowed algorithm
324 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
326 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
327 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.
329 .. warning::
331 Do **not** compute the ``algorithms`` parameter based on
332 the ``alg`` from the token itself, or on any other data
333 that an attacker may be able to influence, as that might
334 expose you to various vulnerabilities (see `RFC 8725 §2.1
335 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
336 either hard-code a fixed value for ``algorithms``, or
337 configure it in the same place you configure the
338 ``key``. Make sure not to mix symmetric and asymmetric
339 algorithms that interpret the ``key`` in different ways
340 (e.g. HS\\* and RS\\*).
341 :type algorithms: typing.Sequence[str] or None
343 :param jwt.types.Options options: extended decoding and validation options
344 Refer to :py:class:`jwt.types.Options` for more information.
346 :param audience: optional, the value for ``verify_aud`` check
347 :type audience: str or typing.Iterable[str] or None
348 :param subject: optional, the value for ``verify_sub`` check
349 :type subject: str or None
350 :param issuer: optional, the value for ``verify_iss`` check
351 :type issuer: str or typing.Container[str] or None
352 :param leeway: a time margin in seconds for the expiration check
353 :type leeway: float or datetime.timedelta
354 :rtype: dict[str, typing.Any]
355 :returns: the JWT claims
356 """
357 if kwargs:
358 warnings.warn(
359 "passing additional kwargs to decode() is deprecated "
360 "and will be removed in pyjwt version 3. "
361 f"Unsupported kwargs: {tuple(kwargs.keys())}",
362 RemovedInPyjwt3Warning,
363 stacklevel=2,
364 )
365 decoded = self.decode_complete(
366 jwt,
367 key,
368 algorithms,
369 options,
370 verify=verify,
371 detached_payload=detached_payload,
372 audience=audience,
373 subject=subject,
374 issuer=issuer,
375 leeway=leeway,
376 )
377 return cast(dict[str, Any], decoded["payload"])
379 def _validate_claims(
380 self,
381 payload: dict[str, Any],
382 options: FullOptions,
383 audience: Iterable[str] | str | None = None,
384 issuer: Container[str] | str | None = None,
385 subject: str | None = None,
386 leeway: float | timedelta = 0,
387 ) -> None:
388 if isinstance(leeway, timedelta):
389 leeway = leeway.total_seconds()
391 if audience is not None and not isinstance(audience, (str, Iterable)):
392 raise TypeError("audience must be a string, iterable or None")
394 self._validate_required_claims(payload, options["require"])
396 now = datetime.now(tz=timezone.utc).timestamp()
398 if "iat" in payload and options["verify_iat"]:
399 self._validate_iat(payload, now, leeway)
401 if "nbf" in payload and options["verify_nbf"]:
402 self._validate_nbf(payload, now, leeway)
404 if "exp" in payload and options["verify_exp"]:
405 self._validate_exp(payload, now, leeway)
407 if options["verify_iss"]:
408 self._validate_iss(payload, issuer)
410 if options["verify_aud"]:
411 self._validate_aud(
412 payload, audience, strict=options.get("strict_aud", False)
413 )
415 if options["verify_sub"]:
416 self._validate_sub(payload, subject)
418 if options["verify_jti"]:
419 self._validate_jti(payload)
421 def _validate_required_claims(
422 self,
423 payload: dict[str, Any],
424 claims: Iterable[str],
425 ) -> None:
426 for claim in claims:
427 if payload.get(claim) is None:
428 raise MissingRequiredClaimError(claim)
430 def _validate_sub(
431 self, payload: dict[str, Any], subject: str | None = None
432 ) -> None:
433 """
434 Checks whether "sub" if in the payload is valid or not.
435 This is an Optional claim
437 :param payload(dict): The payload which needs to be validated
438 :param subject(str): The subject of the token
439 """
441 if "sub" not in payload:
442 return
444 if not isinstance(payload["sub"], str):
445 raise InvalidSubjectError("Subject must be a string")
447 if subject is not None:
448 if payload.get("sub") != subject:
449 raise InvalidSubjectError("Invalid subject")
451 def _validate_jti(self, payload: dict[str, Any]) -> None:
452 """
453 Checks whether "jti" if in the payload is valid or not
454 This is an Optional claim
456 :param payload(dict): The payload which needs to be validated
457 """
459 if "jti" not in payload:
460 return
462 if not isinstance(payload.get("jti"), str):
463 raise InvalidJTIError("JWT ID must be a string")
465 def _validate_iat(
466 self,
467 payload: dict[str, Any],
468 now: float,
469 leeway: float,
470 ) -> None:
471 try:
472 iat = int(payload["iat"])
473 except ValueError:
474 raise InvalidIssuedAtError(
475 "Issued At claim (iat) must be an integer."
476 ) from None
477 if iat > (now + leeway):
478 raise ImmatureSignatureError("The token is not yet valid (iat)")
480 def _validate_nbf(
481 self,
482 payload: dict[str, Any],
483 now: float,
484 leeway: float,
485 ) -> None:
486 try:
487 nbf = int(payload["nbf"])
488 except ValueError:
489 raise DecodeError("Not Before claim (nbf) must be an integer.") from None
491 if nbf > (now + leeway):
492 raise ImmatureSignatureError("The token is not yet valid (nbf)")
494 def _validate_exp(
495 self,
496 payload: dict[str, Any],
497 now: float,
498 leeway: float,
499 ) -> None:
500 try:
501 exp = int(payload["exp"])
502 except ValueError:
503 raise DecodeError(
504 "Expiration Time claim (exp) must be an integer."
505 ) from None
507 if exp <= (now - leeway):
508 raise ExpiredSignatureError("Signature has expired")
510 def _validate_aud(
511 self,
512 payload: dict[str, Any],
513 audience: str | Iterable[str] | None,
514 *,
515 strict: bool = False,
516 ) -> None:
517 if audience is None:
518 if "aud" not in payload or not payload["aud"]:
519 return
520 # Application did not specify an audience, but
521 # the token has the 'aud' claim
522 raise InvalidAudienceError("Invalid audience")
524 if "aud" not in payload or not payload["aud"]:
525 # Application specified an audience, but it could not be
526 # verified since the token does not contain a claim.
527 raise MissingRequiredClaimError("aud")
529 audience_claims = payload["aud"]
531 # In strict mode, we forbid list matching: the supplied audience
532 # must be a string, and it must exactly match the audience claim.
533 if strict:
534 # Only a single audience is allowed in strict mode.
535 if not isinstance(audience, str):
536 raise InvalidAudienceError("Invalid audience (strict)")
538 # Only a single audience claim is allowed in strict mode.
539 if not isinstance(audience_claims, str):
540 raise InvalidAudienceError("Invalid claim format in token (strict)")
542 if audience != audience_claims:
543 raise InvalidAudienceError("Audience doesn't match (strict)")
545 return
547 if isinstance(audience_claims, str):
548 audience_claims = [audience_claims]
549 if not isinstance(audience_claims, list):
550 raise InvalidAudienceError("Invalid claim format in token")
551 if any(not isinstance(c, str) for c in audience_claims):
552 raise InvalidAudienceError("Invalid claim format in token")
554 if isinstance(audience, str):
555 audience = [audience]
557 if all(aud not in audience_claims for aud in audience):
558 raise InvalidAudienceError("Audience doesn't match")
560 def _validate_iss(
561 self, payload: dict[str, Any], issuer: Container[str] | str | None
562 ) -> None:
563 if issuer is None:
564 return
566 if "iss" not in payload:
567 raise MissingRequiredClaimError("iss")
569 iss = payload["iss"]
570 if not isinstance(iss, str):
571 raise InvalidIssuerError("Payload Issuer (iss) must be a string")
573 if isinstance(issuer, str):
574 if iss != issuer:
575 raise InvalidIssuerError("Invalid issuer")
576 else:
577 try:
578 if iss not in issuer:
579 raise InvalidIssuerError("Invalid issuer")
580 except TypeError:
581 raise InvalidIssuerError(
582 'Issuer param must be "str" or "Container[str]"'
583 ) from None
586_jwt_global_obj = PyJWT()
587_jwt_global_obj._jws = _jws_global_obj
588encode = _jwt_global_obj.encode
589decode_complete = _jwt_global_obj.decode_complete
590decode = _jwt_global_obj.decode