Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 63%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import os
5import warnings
6from calendar import timegm
7from collections.abc import Iterable, Sequence
8from datetime import datetime, timedelta, timezone
9from typing import TYPE_CHECKING, Any, Container
11from . import api_jws
12from .exceptions import (
13 DecodeError,
14 ExpiredSignatureError,
15 ImmatureSignatureError,
16 InvalidAudienceError,
17 InvalidIssuedAtError,
18 InvalidIssuerError,
19 InvalidJTIError,
20 InvalidSubjectError,
21 MissingRequiredClaimError,
22)
23from .warnings import RemovedInPyjwt3Warning
25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")):
26 from typing import TypeAlias
28 from .algorithms import has_crypto
29 from .api_jwk import PyJWK
30 from .types import FullOptions, Options, SigOptions
32 if has_crypto:
33 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
35 AllowedPrivateKeyTypes: TypeAlias = AllowedPrivateKeys | PyJWK | str | bytes # type: ignore
36 AllowedPublicKeyTypes: TypeAlias = AllowedPublicKeys | PyJWK | str | bytes # type: ignore
37 else:
38 AllowedPrivateKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore
39 AllowedPublicKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore
42class PyJWT:
43 def __init__(self, options: Options | None = None) -> None:
44 self.options: FullOptions
45 self.options = self._get_default_options()
46 if options is not None:
47 self.options = self._merge_options(options)
49 @staticmethod
50 def _get_default_options() -> FullOptions:
51 return {
52 "verify_signature": True,
53 "verify_exp": True,
54 "verify_nbf": True,
55 "verify_iat": True,
56 "verify_aud": True,
57 "verify_iss": True,
58 "verify_sub": True,
59 "verify_jti": True,
60 "require": [],
61 "strict_aud": False,
62 }
64 def _merge_options(self, options: Options | None = None) -> FullOptions:
65 if options is None:
66 return self.options
68 # (defensive) set defaults for verify_x to False if verify_signature is False
69 if not options.get("verify_signature", True):
70 options["verify_exp"] = options.get("verify_exp", False)
71 options["verify_nbf"] = options.get("verify_nbf", False)
72 options["verify_iat"] = options.get("verify_iat", False)
73 options["verify_aud"] = options.get("verify_aud", False)
74 options["verify_iss"] = options.get("verify_iss", False)
75 options["verify_sub"] = options.get("verify_sub", False)
76 options["verify_jti"] = options.get("verify_jti", False)
77 return {**self.options, **options}
79 def encode(
80 self,
81 payload: dict[str, Any],
82 key: AllowedPrivateKeyTypes,
83 algorithm: str | None = "HS256",
84 headers: dict[str, Any] | None = None,
85 json_encoder: type[json.JSONEncoder] | None = None,
86 sort_headers: bool = True,
87 ) -> str:
88 """Encode the ``payload`` as JSON Web Token.
90 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
91 :type payload: dict[str, typing.Any]
92 :param key: a key suitable for the chosen algorithm:
94 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
95 * for **symmetric algorithms**: plain string, sufficiently long for security
97 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
98 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
99 If ``headers`` includes ``alg``, it will be preferred to this parameter.
100 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
101 :type algorithm: str or None
102 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
103 :type headers: dict[str, typing.Any] or None
104 :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
105 :type json_encoder: json.JSONEncoder or None
107 :rtype: str
108 :returns: a JSON Web Token
110 :raises TypeError: if ``payload`` is not a ``dict``
111 """
112 # Check that we get a dict
113 if not isinstance(payload, dict):
114 raise TypeError(
115 "Expecting a dict object, as JWT only supports "
116 "JSON objects as payloads."
117 )
119 # Payload
120 payload = payload.copy()
121 for time_claim in ["exp", "iat", "nbf"]:
122 # Convert datetime to a intDate value in known time-format claims
123 if isinstance(payload.get(time_claim), datetime):
124 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
126 # Issue #1039, iss being set to non-string
127 if "iss" in payload and not isinstance(payload["iss"], str):
128 raise TypeError("Issuer (iss) must be a string.")
130 json_payload = self._encode_payload(
131 payload,
132 headers=headers,
133 json_encoder=json_encoder,
134 )
136 return api_jws.encode(
137 json_payload,
138 key,
139 algorithm,
140 headers,
141 json_encoder,
142 sort_headers=sort_headers,
143 )
145 def _encode_payload(
146 self,
147 payload: dict[str, Any],
148 headers: dict[str, Any] | None = None,
149 json_encoder: type[json.JSONEncoder] | None = None,
150 ) -> bytes:
151 """
152 Encode a given payload to the bytes to be signed.
154 This method is intended to be overridden by subclasses that need to
155 encode the payload in a different way, e.g. compress the payload.
156 """
157 return json.dumps(
158 payload,
159 separators=(",", ":"),
160 cls=json_encoder,
161 ).encode("utf-8")
163 def decode_complete(
164 self,
165 jwt: str | bytes,
166 key: AllowedPublicKeyTypes = "",
167 algorithms: Sequence[str] | None = None,
168 options: Options | None = None,
169 # deprecated arg, remove in pyjwt3
170 verify: bool | None = None,
171 # could be used as passthrough to api_jws, consider removal in pyjwt3
172 detached_payload: bytes | None = None,
173 # passthrough arguments to _validate_claims
174 # consider putting in options
175 audience: str | Iterable[str] | None = None,
176 issuer: str | Container[str] | None = None,
177 subject: str | None = None,
178 leeway: float | timedelta = 0,
179 # kwargs
180 **kwargs: Any,
181 ) -> dict[str, Any]:
182 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
183 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
184 and "signature" respectively.
186 :param jwt: the token to be decoded
187 :type jwt: str or bytes
188 :param key: the key suitable for the allowed algorithm
189 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
191 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
193 .. warning::
195 Do **not** compute the ``algorithms`` parameter based on
196 the ``alg`` from the token itself, or on any other data
197 that an attacker may be able to influence, as that might
198 expose you to various vulnerabilities (see `RFC 8725 §2.1
199 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
200 either hard-code a fixed value for ``algorithms``, or
201 configure it in the same place you configure the
202 ``key``. Make sure not to mix symmetric and asymmetric
203 algorithms that interpret the ``key`` in different ways
204 (e.g. HS\\* and RS\\*).
205 :type algorithms: typing.Sequence[str] or None
207 :param jwt.types.Options options: extended decoding and validation options
208 Refer to :py:class:`jwt.types.Options` for more information.
210 :param audience: optional, the value for ``verify_aud`` check
211 :type audience: str or typing.Iterable[str] or None
212 :param issuer: optional, the value for ``verify_iss`` check
213 :type issuer: str or typing.Container[str] or None
214 :param leeway: a time margin in seconds for the expiration check
215 :type leeway: float or datetime.timedelta
216 :rtype: dict[str, typing.Any]
217 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
218 Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
219 """
220 if kwargs:
221 warnings.warn(
222 "passing additional kwargs to decode_complete() is deprecated "
223 "and will be removed in pyjwt version 3. "
224 f"Unsupported kwargs: {tuple(kwargs.keys())}",
225 RemovedInPyjwt3Warning,
226 stacklevel=2,
227 )
229 if options is None:
230 verify_signature = True
231 else:
232 verify_signature = options.get("verify_signature", True)
234 # If the user has set the legacy `verify` argument, and it doesn't match
235 # what the relevant `options` entry for the argument is, inform the user
236 # that they're likely making a mistake.
237 if verify is not None and verify != verify_signature:
238 warnings.warn(
239 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
240 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
241 "This invocation has a mismatch between the kwarg and the option entry.",
242 category=DeprecationWarning,
243 stacklevel=2,
244 )
246 sig_options: SigOptions = {"verify_signature": verify_signature}
247 decoded = api_jws.decode_complete(
248 jwt,
249 key=key,
250 algorithms=algorithms,
251 options=sig_options,
252 detached_payload=detached_payload,
253 )
255 payload = self._decode_payload(decoded)
257 merged_options = self._merge_options(options)
258 self._validate_claims(
259 payload,
260 merged_options,
261 audience=audience,
262 issuer=issuer,
263 leeway=leeway,
264 subject=subject,
265 )
267 decoded["payload"] = payload
268 return decoded
270 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
271 """
272 Decode the payload from a JWS dictionary (payload, signature, header).
274 This method is intended to be overridden by subclasses that need to
275 decode the payload in a different way, e.g. decompress compressed
276 payloads.
277 """
278 try:
279 payload: dict[str, Any] = json.loads(decoded["payload"])
280 except ValueError as e:
281 raise DecodeError(f"Invalid payload string: {e}") from e
282 if not isinstance(payload, dict):
283 raise DecodeError("Invalid payload string: must be a json object")
284 return payload
286 def decode(
287 self,
288 jwt: str | bytes,
289 key: AllowedPublicKeys | PyJWK | str | bytes = "",
290 algorithms: Sequence[str] | None = None,
291 options: Options | None = None,
292 # deprecated arg, remove in pyjwt3
293 verify: bool | None = None,
294 # could be used as passthrough to api_jws, consider removal in pyjwt3
295 detached_payload: bytes | None = None,
296 # passthrough arguments to _validate_claims
297 # consider putting in options
298 audience: str | Iterable[str] | None = None,
299 subject: str | None = None,
300 issuer: str | Container[str] | None = None,
301 leeway: float | timedelta = 0,
302 # kwargs
303 **kwargs: Any,
304 ) -> dict[str, Any]:
305 """Verify the ``jwt`` token signature and return the token claims.
307 :param jwt: the token to be decoded
308 :type jwt: str or bytes
309 :param key: the key suitable for the allowed algorithm
310 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`
312 :param algorithms: allowed algorithms, e.g. ``["ES256"]``
313 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.
315 .. warning::
317 Do **not** compute the ``algorithms`` parameter based on
318 the ``alg`` from the token itself, or on any other data
319 that an attacker may be able to influence, as that might
320 expose you to various vulnerabilities (see `RFC 8725 §2.1
321 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
322 either hard-code a fixed value for ``algorithms``, or
323 configure it in the same place you configure the
324 ``key``. Make sure not to mix symmetric and asymmetric
325 algorithms that interpret the ``key`` in different ways
326 (e.g. HS\* and RS\*).
327 :type algorithms: typing.Sequence[str] or None
329 :param jwt.types.Options options: extended decoding and validation options
330 Refer to :py:class:`jwt.types.Options` for more information.
332 :param audience: optional, the value for ``verify_aud`` check
333 :type audience: str or typing.Iterable[str] or None
334 :param subject: optional, the value for ``verify_sub`` check
335 :type subject: str or None
336 :param issuer: optional, the value for ``verify_iss`` check
337 :type issuer: str or typing.Container[str] or None
338 :param leeway: a time margin in seconds for the expiration check
339 :type leeway: float or datetime.timedelta
340 :rtype: dict[str, typing.Any]
341 :returns: the JWT claims
342 """
343 if kwargs:
344 warnings.warn(
345 "passing additional kwargs to decode() is deprecated "
346 "and will be removed in pyjwt version 3. "
347 f"Unsupported kwargs: {tuple(kwargs.keys())}",
348 RemovedInPyjwt3Warning,
349 stacklevel=2,
350 )
351 decoded = self.decode_complete(
352 jwt,
353 key,
354 algorithms,
355 options,
356 verify=verify,
357 detached_payload=detached_payload,
358 audience=audience,
359 subject=subject,
360 issuer=issuer,
361 leeway=leeway,
362 )
363 return decoded["payload"]
365 def _validate_claims(
366 self,
367 payload: dict[str, Any],
368 options: FullOptions,
369 audience: Iterable[str] | str | None = None,
370 issuer: Container[str] | str | None = None,
371 subject: str | None = None,
372 leeway: float | timedelta = 0,
373 ) -> None:
374 if isinstance(leeway, timedelta):
375 leeway = leeway.total_seconds()
377 if audience is not None and not isinstance(audience, (str, Iterable)):
378 raise TypeError("audience must be a string, iterable or None")
380 self._validate_required_claims(payload, options["require"])
382 now = datetime.now(tz=timezone.utc).timestamp()
384 if "iat" in payload and options["verify_iat"]:
385 self._validate_iat(payload, now, leeway)
387 if "nbf" in payload and options["verify_nbf"]:
388 self._validate_nbf(payload, now, leeway)
390 if "exp" in payload and options["verify_exp"]:
391 self._validate_exp(payload, now, leeway)
393 if options["verify_iss"]:
394 self._validate_iss(payload, issuer)
396 if options["verify_aud"]:
397 self._validate_aud(
398 payload, audience, strict=options.get("strict_aud", False)
399 )
401 if options["verify_sub"]:
402 self._validate_sub(payload, subject)
404 if options["verify_jti"]:
405 self._validate_jti(payload)
407 def _validate_required_claims(
408 self,
409 payload: dict[str, Any],
410 claims: Iterable[str],
411 ) -> None:
412 for claim in claims:
413 if payload.get(claim) is None:
414 raise MissingRequiredClaimError(claim)
416 def _validate_sub(
417 self, payload: dict[str, Any], subject: str | None = None
418 ) -> None:
419 """
420 Checks whether "sub" if in the payload is valid or not.
421 This is an Optional claim
423 :param payload(dict): The payload which needs to be validated
424 :param subject(str): The subject of the token
425 """
427 if "sub" not in payload:
428 return
430 if not isinstance(payload["sub"], str):
431 raise InvalidSubjectError("Subject must be a string")
433 if subject is not None:
434 if payload.get("sub") != subject:
435 raise InvalidSubjectError("Invalid subject")
437 def _validate_jti(self, payload: dict[str, Any]) -> None:
438 """
439 Checks whether "jti" if in the payload is valid or not
440 This is an Optional claim
442 :param payload(dict): The payload which needs to be validated
443 """
445 if "jti" not in payload:
446 return
448 if not isinstance(payload.get("jti"), str):
449 raise InvalidJTIError("JWT ID must be a string")
451 def _validate_iat(
452 self,
453 payload: dict[str, Any],
454 now: float,
455 leeway: float,
456 ) -> None:
457 try:
458 iat = int(payload["iat"])
459 except ValueError:
460 raise InvalidIssuedAtError(
461 "Issued At claim (iat) must be an integer."
462 ) from None
463 if iat > (now + leeway):
464 raise ImmatureSignatureError("The token is not yet valid (iat)")
466 def _validate_nbf(
467 self,
468 payload: dict[str, Any],
469 now: float,
470 leeway: float,
471 ) -> None:
472 try:
473 nbf = int(payload["nbf"])
474 except ValueError:
475 raise DecodeError("Not Before claim (nbf) must be an integer.") from None
477 if nbf > (now + leeway):
478 raise ImmatureSignatureError("The token is not yet valid (nbf)")
480 def _validate_exp(
481 self,
482 payload: dict[str, Any],
483 now: float,
484 leeway: float,
485 ) -> None:
486 try:
487 exp = int(payload["exp"])
488 except ValueError:
489 raise DecodeError(
490 "Expiration Time claim (exp) must be an integer."
491 ) from None
493 if exp <= (now - leeway):
494 raise ExpiredSignatureError("Signature has expired")
496 def _validate_aud(
497 self,
498 payload: dict[str, Any],
499 audience: str | Iterable[str] | None,
500 *,
501 strict: bool = False,
502 ) -> None:
503 if audience is None:
504 if "aud" not in payload or not payload["aud"]:
505 return
506 # Application did not specify an audience, but
507 # the token has the 'aud' claim
508 raise InvalidAudienceError("Invalid audience")
510 if "aud" not in payload or not payload["aud"]:
511 # Application specified an audience, but it could not be
512 # verified since the token does not contain a claim.
513 raise MissingRequiredClaimError("aud")
515 audience_claims = payload["aud"]
517 # In strict mode, we forbid list matching: the supplied audience
518 # must be a string, and it must exactly match the audience claim.
519 if strict:
520 # Only a single audience is allowed in strict mode.
521 if not isinstance(audience, str):
522 raise InvalidAudienceError("Invalid audience (strict)")
524 # Only a single audience claim is allowed in strict mode.
525 if not isinstance(audience_claims, str):
526 raise InvalidAudienceError("Invalid claim format in token (strict)")
528 if audience != audience_claims:
529 raise InvalidAudienceError("Audience doesn't match (strict)")
531 return
533 if isinstance(audience_claims, str):
534 audience_claims = [audience_claims]
535 if not isinstance(audience_claims, list):
536 raise InvalidAudienceError("Invalid claim format in token")
537 if any(not isinstance(c, str) for c in audience_claims):
538 raise InvalidAudienceError("Invalid claim format in token")
540 if isinstance(audience, str):
541 audience = [audience]
543 if all(aud not in audience_claims for aud in audience):
544 raise InvalidAudienceError("Audience doesn't match")
546 def _validate_iss(
547 self, payload: dict[str, Any], issuer: Container[str] | str | None
548 ) -> None:
549 if issuer is None:
550 return
552 if "iss" not in payload:
553 raise MissingRequiredClaimError("iss")
555 iss = payload["iss"]
556 if not isinstance(iss, str):
557 raise InvalidIssuerError("Payload Issuer (iss) must be a string")
559 if isinstance(issuer, str):
560 if iss != issuer:
561 raise InvalidIssuerError("Invalid issuer")
562 else:
563 try:
564 if iss not in issuer:
565 raise InvalidIssuerError("Invalid issuer")
566 except TypeError:
567 raise InvalidIssuerError(
568 'Issuer param must be "str" or "Container[str]"'
569 ) from None
572_jwt_global_obj = PyJWT()
573encode = _jwt_global_obj.encode
574decode_complete = _jwt_global_obj.decode_complete
575decode = _jwt_global_obj.decode