Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import os
5import warnings
6from calendar import timegm
7from collections.abc import Container, Iterable, Sequence
8from datetime import datetime, timedelta, timezone
9from typing import TYPE_CHECKING, Any, Union, cast
11from . import api_jws
12from .exceptions import (
13 DecodeError,
14 ExpiredSignatureError,
15 ImmatureSignatureError,
16 InvalidAudienceError,
17 InvalidIssuedAtError,
18 InvalidIssuerError,
19 InvalidJTIError,
20 InvalidSubjectError,
21 MissingRequiredClaimError,
22)
23from .warnings import RemovedInPyjwt3Warning
25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")):
26 import sys
28 if sys.version_info >= (3, 10):
29 from typing import TypeAlias
30 else:
31 # Python 3.9 and lower
32 from typing_extensions import TypeAlias
34 from .algorithms import has_crypto
35 from .api_jwk import PyJWK
36 from .types import FullOptions, Options, SigOptions
38 if has_crypto:
39 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
41 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes]
42 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes]
43 else:
44 AllowedPrivateKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore
45 AllowedPublicKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore
48class PyJWT:
49 def __init__(self, options: Options | None = None) -> None:
50 self.options: FullOptions
51 self.options = self._get_default_options()
52 if options is not None:
53 self.options = self._merge_options(options)
55 @staticmethod
56 def _get_default_options() -> FullOptions:
57 return {
58 "verify_signature": True,
59 "verify_exp": True,
60 "verify_nbf": True,
61 "verify_iat": True,
62 "verify_aud": True,
63 "verify_iss": True,
64 "verify_sub": True,
65 "verify_jti": True,
66 "require": [],
67 "strict_aud": False,
68 }
70 def _merge_options(self, options: Options | None = None) -> FullOptions:
71 if options is None:
72 return self.options
74 # (defensive) set defaults for verify_x to False if verify_signature is False
75 if not options.get("verify_signature", True):
76 options["verify_exp"] = options.get("verify_exp", False)
77 options["verify_nbf"] = options.get("verify_nbf", False)
78 options["verify_iat"] = options.get("verify_iat", False)
79 options["verify_aud"] = options.get("verify_aud", False)
80 options["verify_iss"] = options.get("verify_iss", False)
81 options["verify_sub"] = options.get("verify_sub", False)
82 options["verify_jti"] = options.get("verify_jti", False)
83 return {**self.options, **options}
85 def encode(
86 self,
87 payload: dict[str, Any],
88 key: AllowedPrivateKeyTypes,
89 algorithm: str | None = "HS256",
90 headers: dict[str, Any] | None = None,
91 json_encoder: type[json.JSONEncoder] | None = None,
92 sort_headers: bool = True,
93 ) -> str:
94 """Encode the ``payload`` as JSON Web Token.
96 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
97 :type payload: dict[str, typing.Any]
98 :param key: a key suitable for the chosen algorithm:
100 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
101 * for **symmetric algorithms**: plain string, sufficiently long for security
103 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
104 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
105 If ``headers`` includes ``alg``, it will be preferred to this parameter.
106 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
107 :type algorithm: str or None
108 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
109 :type headers: dict[str, typing.Any] or None
110 :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
111 :type json_encoder: json.JSONEncoder or None
113 :rtype: str
114 :returns: a JSON Web Token
116 :raises TypeError: if ``payload`` is not a ``dict``
117 """
118 # Check that we get a dict
119 if not isinstance(payload, dict):
120 raise TypeError(
121 "Expecting a dict object, as JWT only supports "
122 "JSON objects as payloads."
123 )
125 # Payload
126 payload = payload.copy()
127 for time_claim in ["exp", "iat", "nbf"]:
128 # Convert datetime to a intDate value in known time-format claims
129 if isinstance(payload.get(time_claim), datetime):
130 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
132 # Issue #1039, iss being set to non-string
133 if "iss" in payload and not isinstance(payload["iss"], str):
134 raise TypeError("Issuer (iss) must be a string.")
136 json_payload = self._encode_payload(
137 payload,
138 headers=headers,
139 json_encoder=json_encoder,
140 )
142 return api_jws.encode(
143 json_payload,
144 key,
145 algorithm,
146 headers,
147 json_encoder,
148 sort_headers=sort_headers,
149 )
151 def _encode_payload(
152 self,
153 payload: dict[str, Any],
154 headers: dict[str, Any] | None = None,
155 json_encoder: type[json.JSONEncoder] | None = None,
156 ) -> bytes:
157 """
158 Encode a given payload to the bytes to be signed.
160 This method is intended to be overridden by subclasses that need to
161 encode the payload in a different way, e.g. compress the payload.
162 """
163 return json.dumps(
164 payload,
165 separators=(",", ":"),
166 cls=json_encoder,
167 ).encode("utf-8")
169 def decode_complete(
170 self,
171 jwt: str | bytes,
172 key: AllowedPublicKeyTypes = "",
173 algorithms: Sequence[str] | None = None,
174 options: Options | None = None,
175 # deprecated arg, remove in pyjwt3
176 verify: bool | None = None,
177 # could be used as passthrough to api_jws, consider removal in pyjwt3
178 detached_payload: bytes | None = None,
179 # passthrough arguments to _validate_claims
180 # consider putting in options
181 audience: str | Iterable[str] | None = None,
182 issuer: str | Container[str] | None = None,
183 subject: str | None = None,
184 leeway: float | timedelta = 0,
185 # kwargs
186 **kwargs: Any,
187 ) -> dict[str, Any]:
188 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
189 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
190 and "signature" respectively.
192 :param jwt: the token to be decoded
193 :type jwt: str or bytes
194 :param key: the key suitable for the allowed algorithm
195 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
197 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
199 .. warning::
201 Do **not** compute the ``algorithms`` parameter based on
202 the ``alg`` from the token itself, or on any other data
203 that an attacker may be able to influence, as that might
204 expose you to various vulnerabilities (see `RFC 8725 §2.1
205 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
206 either hard-code a fixed value for ``algorithms``, or
207 configure it in the same place you configure the
208 ``key``. Make sure not to mix symmetric and asymmetric
209 algorithms that interpret the ``key`` in different ways
210 (e.g. HS\\* and RS\\*).
211 :type algorithms: typing.Sequence[str] or None
213 :param jwt.types.Options options: extended decoding and validation options
214 Refer to :py:class:`jwt.types.Options` for more information.
216 :param audience: optional, the value for ``verify_aud`` check
217 :type audience: str or typing.Iterable[str] or None
218 :param issuer: optional, the value for ``verify_iss`` check
219 :type issuer: str or typing.Container[str] or None
220 :param leeway: a time margin in seconds for the expiration check
221 :type leeway: float or datetime.timedelta
222 :rtype: dict[str, typing.Any]
223 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
224 Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
225 """
226 if kwargs:
227 warnings.warn(
228 "passing additional kwargs to decode_complete() is deprecated "
229 "and will be removed in pyjwt version 3. "
230 f"Unsupported kwargs: {tuple(kwargs.keys())}",
231 RemovedInPyjwt3Warning,
232 stacklevel=2,
233 )
235 if options is None:
236 verify_signature = True
237 else:
238 verify_signature = options.get("verify_signature", True)
240 # If the user has set the legacy `verify` argument, and it doesn't match
241 # what the relevant `options` entry for the argument is, inform the user
242 # that they're likely making a mistake.
243 if verify is not None and verify != verify_signature:
244 warnings.warn(
245 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
246 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
247 "This invocation has a mismatch between the kwarg and the option entry.",
248 category=DeprecationWarning,
249 stacklevel=2,
250 )
252 sig_options: SigOptions = {"verify_signature": verify_signature}
253 decoded = api_jws.decode_complete(
254 jwt,
255 key=key,
256 algorithms=algorithms,
257 options=sig_options,
258 detached_payload=detached_payload,
259 )
261 payload = self._decode_payload(decoded)
263 merged_options = self._merge_options(options)
264 self._validate_claims(
265 payload,
266 merged_options,
267 audience=audience,
268 issuer=issuer,
269 leeway=leeway,
270 subject=subject,
271 )
273 decoded["payload"] = payload
274 return decoded
276 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
277 """
278 Decode the payload from a JWS dictionary (payload, signature, header).
280 This method is intended to be overridden by subclasses that need to
281 decode the payload in a different way, e.g. decompress compressed
282 payloads.
283 """
284 try:
285 payload: dict[str, Any] = json.loads(decoded["payload"])
286 except ValueError as e:
287 raise DecodeError(f"Invalid payload string: {e}") from e
288 if not isinstance(payload, dict):
289 raise DecodeError("Invalid payload string: must be a json object")
290 return payload
292 def decode(
293 self,
294 jwt: str | bytes,
295 key: AllowedPublicKeys | PyJWK | str | bytes = "",
296 algorithms: Sequence[str] | None = None,
297 options: Options | None = None,
298 # deprecated arg, remove in pyjwt3
299 verify: bool | None = None,
300 # could be used as passthrough to api_jws, consider removal in pyjwt3
301 detached_payload: bytes | None = None,
302 # passthrough arguments to _validate_claims
303 # consider putting in options
304 audience: str | Iterable[str] | None = None,
305 subject: str | None = None,
306 issuer: str | Container[str] | None = None,
307 leeway: float | timedelta = 0,
308 # kwargs
309 **kwargs: Any,
310 ) -> dict[str, Any]:
311 """Verify the ``jwt`` token signature and return the token claims.
313 :param jwt: the token to be decoded
314 :type jwt: str or bytes
315 :param key: the key suitable for the allowed algorithm
316 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
318 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
319 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.
321 .. warning::
323 Do **not** compute the ``algorithms`` parameter based on
324 the ``alg`` from the token itself, or on any other data
325 that an attacker may be able to influence, as that might
326 expose you to various vulnerabilities (see `RFC 8725 §2.1
327 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
328 either hard-code a fixed value for ``algorithms``, or
329 configure it in the same place you configure the
330 ``key``. Make sure not to mix symmetric and asymmetric
331 algorithms that interpret the ``key`` in different ways
332 (e.g. HS\\* and RS\\*).
333 :type algorithms: typing.Sequence[str] or None
335 :param jwt.types.Options options: extended decoding and validation options
336 Refer to :py:class:`jwt.types.Options` for more information.
338 :param audience: optional, the value for ``verify_aud`` check
339 :type audience: str or typing.Iterable[str] or None
340 :param subject: optional, the value for ``verify_sub`` check
341 :type subject: str or None
342 :param issuer: optional, the value for ``verify_iss`` check
343 :type issuer: str or typing.Container[str] or None
344 :param leeway: a time margin in seconds for the expiration check
345 :type leeway: float or datetime.timedelta
346 :rtype: dict[str, typing.Any]
347 :returns: the JWT claims
348 """
349 if kwargs:
350 warnings.warn(
351 "passing additional kwargs to decode() is deprecated "
352 "and will be removed in pyjwt version 3. "
353 f"Unsupported kwargs: {tuple(kwargs.keys())}",
354 RemovedInPyjwt3Warning,
355 stacklevel=2,
356 )
357 decoded = self.decode_complete(
358 jwt,
359 key,
360 algorithms,
361 options,
362 verify=verify,
363 detached_payload=detached_payload,
364 audience=audience,
365 subject=subject,
366 issuer=issuer,
367 leeway=leeway,
368 )
369 return cast(dict[str, Any], decoded["payload"])
371 def _validate_claims(
372 self,
373 payload: dict[str, Any],
374 options: FullOptions,
375 audience: Iterable[str] | str | None = None,
376 issuer: Container[str] | str | None = None,
377 subject: str | None = None,
378 leeway: float | timedelta = 0,
379 ) -> None:
380 if isinstance(leeway, timedelta):
381 leeway = leeway.total_seconds()
383 if audience is not None and not isinstance(audience, (str, Iterable)):
384 raise TypeError("audience must be a string, iterable or None")
386 self._validate_required_claims(payload, options["require"])
388 now = datetime.now(tz=timezone.utc).timestamp()
390 if "iat" in payload and options["verify_iat"]:
391 self._validate_iat(payload, now, leeway)
393 if "nbf" in payload and options["verify_nbf"]:
394 self._validate_nbf(payload, now, leeway)
396 if "exp" in payload and options["verify_exp"]:
397 self._validate_exp(payload, now, leeway)
399 if options["verify_iss"]:
400 self._validate_iss(payload, issuer)
402 if options["verify_aud"]:
403 self._validate_aud(
404 payload, audience, strict=options.get("strict_aud", False)
405 )
407 if options["verify_sub"]:
408 self._validate_sub(payload, subject)
410 if options["verify_jti"]:
411 self._validate_jti(payload)
413 def _validate_required_claims(
414 self,
415 payload: dict[str, Any],
416 claims: Iterable[str],
417 ) -> None:
418 for claim in claims:
419 if payload.get(claim) is None:
420 raise MissingRequiredClaimError(claim)
422 def _validate_sub(
423 self, payload: dict[str, Any], subject: str | None = None
424 ) -> None:
425 """
426 Checks whether "sub" if in the payload is valid or not.
427 This is an Optional claim
429 :param payload(dict): The payload which needs to be validated
430 :param subject(str): The subject of the token
431 """
433 if "sub" not in payload:
434 return
436 if not isinstance(payload["sub"], str):
437 raise InvalidSubjectError("Subject must be a string")
439 if subject is not None:
440 if payload.get("sub") != subject:
441 raise InvalidSubjectError("Invalid subject")
443 def _validate_jti(self, payload: dict[str, Any]) -> None:
444 """
445 Checks whether "jti" if in the payload is valid or not
446 This is an Optional claim
448 :param payload(dict): The payload which needs to be validated
449 """
451 if "jti" not in payload:
452 return
454 if not isinstance(payload.get("jti"), str):
455 raise InvalidJTIError("JWT ID must be a string")
457 def _validate_iat(
458 self,
459 payload: dict[str, Any],
460 now: float,
461 leeway: float,
462 ) -> None:
463 try:
464 iat = int(payload["iat"])
465 except ValueError:
466 raise InvalidIssuedAtError(
467 "Issued At claim (iat) must be an integer."
468 ) from None
469 if iat > (now + leeway):
470 raise ImmatureSignatureError("The token is not yet valid (iat)")
472 def _validate_nbf(
473 self,
474 payload: dict[str, Any],
475 now: float,
476 leeway: float,
477 ) -> None:
478 try:
479 nbf = int(payload["nbf"])
480 except ValueError:
481 raise DecodeError("Not Before claim (nbf) must be an integer.") from None
483 if nbf > (now + leeway):
484 raise ImmatureSignatureError("The token is not yet valid (nbf)")
486 def _validate_exp(
487 self,
488 payload: dict[str, Any],
489 now: float,
490 leeway: float,
491 ) -> None:
492 try:
493 exp = int(payload["exp"])
494 except ValueError:
495 raise DecodeError(
496 "Expiration Time claim (exp) must be an integer."
497 ) from None
499 if exp <= (now - leeway):
500 raise ExpiredSignatureError("Signature has expired")
502 def _validate_aud(
503 self,
504 payload: dict[str, Any],
505 audience: str | Iterable[str] | None,
506 *,
507 strict: bool = False,
508 ) -> None:
509 if audience is None:
510 if "aud" not in payload or not payload["aud"]:
511 return
512 # Application did not specify an audience, but
513 # the token has the 'aud' claim
514 raise InvalidAudienceError("Invalid audience")
516 if "aud" not in payload or not payload["aud"]:
517 # Application specified an audience, but it could not be
518 # verified since the token does not contain a claim.
519 raise MissingRequiredClaimError("aud")
521 audience_claims = payload["aud"]
523 # In strict mode, we forbid list matching: the supplied audience
524 # must be a string, and it must exactly match the audience claim.
525 if strict:
526 # Only a single audience is allowed in strict mode.
527 if not isinstance(audience, str):
528 raise InvalidAudienceError("Invalid audience (strict)")
530 # Only a single audience claim is allowed in strict mode.
531 if not isinstance(audience_claims, str):
532 raise InvalidAudienceError("Invalid claim format in token (strict)")
534 if audience != audience_claims:
535 raise InvalidAudienceError("Audience doesn't match (strict)")
537 return
539 if isinstance(audience_claims, str):
540 audience_claims = [audience_claims]
541 if not isinstance(audience_claims, list):
542 raise InvalidAudienceError("Invalid claim format in token")
543 if any(not isinstance(c, str) for c in audience_claims):
544 raise InvalidAudienceError("Invalid claim format in token")
546 if isinstance(audience, str):
547 audience = [audience]
549 if all(aud not in audience_claims for aud in audience):
550 raise InvalidAudienceError("Audience doesn't match")
552 def _validate_iss(
553 self, payload: dict[str, Any], issuer: Container[str] | str | None
554 ) -> None:
555 if issuer is None:
556 return
558 if "iss" not in payload:
559 raise MissingRequiredClaimError("iss")
561 iss = payload["iss"]
562 if not isinstance(iss, str):
563 raise InvalidIssuerError("Payload Issuer (iss) must be a string")
565 if isinstance(issuer, str):
566 if iss != issuer:
567 raise InvalidIssuerError("Invalid issuer")
568 else:
569 try:
570 if iss not in issuer:
571 raise InvalidIssuerError("Invalid issuer")
572 except TypeError:
573 raise InvalidIssuerError(
574 'Issuer param must be "str" or "Container[str]"'
575 ) from None
578_jwt_global_obj = PyJWT()
579encode = _jwt_global_obj.encode
580decode_complete = _jwt_global_obj.decode_complete
581decode = _jwt_global_obj.decode