Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jwt/api_jwt.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import json
4import warnings
5from calendar import timegm
6from collections.abc import Iterable
7from datetime import datetime, timedelta, timezone
8from typing import TYPE_CHECKING, Any, List
10from . import api_jws
11from .exceptions import (
12 DecodeError,
13 ExpiredSignatureError,
14 ImmatureSignatureError,
15 InvalidAudienceError,
16 InvalidIssuedAtError,
17 InvalidIssuerError,
18 MissingRequiredClaimError,
19)
20from .warnings import RemovedInPyjwt3Warning
22if TYPE_CHECKING:
23 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
24 from .api_jwk import PyJWK
27class PyJWT:
28 def __init__(self, options: dict[str, Any] | None = None) -> None:
29 if options is None:
30 options = {}
31 self.options: dict[str, Any] = {**self._get_default_options(), **options}
33 @staticmethod
34 def _get_default_options() -> dict[str, bool | list[str]]:
35 return {
36 "verify_signature": True,
37 "verify_exp": True,
38 "verify_nbf": True,
39 "verify_iat": True,
40 "verify_aud": True,
41 "verify_iss": True,
42 "require": [],
43 }
45 def encode(
46 self,
47 payload: dict[str, Any],
48 key: AllowedPrivateKeys | str | bytes,
49 algorithm: str | None = "HS256",
50 headers: dict[str, Any] | None = None,
51 json_encoder: type[json.JSONEncoder] | None = None,
52 sort_headers: bool = True,
53 ) -> str:
54 # Check that we get a dict
55 if not isinstance(payload, dict):
56 raise TypeError(
57 "Expecting a dict object, as JWT only supports "
58 "JSON objects as payloads."
59 )
61 # Payload
62 payload = payload.copy()
63 for time_claim in ["exp", "iat", "nbf"]:
64 # Convert datetime to a intDate value in known time-format claims
65 if isinstance(payload.get(time_claim), datetime):
66 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
68 json_payload = self._encode_payload(
69 payload,
70 headers=headers,
71 json_encoder=json_encoder,
72 )
74 return api_jws.encode(
75 json_payload,
76 key,
77 algorithm,
78 headers,
79 json_encoder,
80 sort_headers=sort_headers,
81 )
83 def _encode_payload(
84 self,
85 payload: dict[str, Any],
86 headers: dict[str, Any] | None = None,
87 json_encoder: type[json.JSONEncoder] | None = None,
88 ) -> bytes:
89 """
90 Encode a given payload to the bytes to be signed.
92 This method is intended to be overridden by subclasses that need to
93 encode the payload in a different way, e.g. compress the payload.
94 """
95 return json.dumps(
96 payload,
97 separators=(",", ":"),
98 cls=json_encoder,
99 ).encode("utf-8")
101 def decode_complete(
102 self,
103 jwt: str | bytes,
104 key: AllowedPublicKeys | PyJWK | str | bytes = "",
105 algorithms: list[str] | None = None,
106 options: dict[str, Any] | None = None,
107 # deprecated arg, remove in pyjwt3
108 verify: bool | None = None,
109 # could be used as passthrough to api_jws, consider removal in pyjwt3
110 detached_payload: bytes | None = None,
111 # passthrough arguments to _validate_claims
112 # consider putting in options
113 audience: str | Iterable[str] | None = None,
114 issuer: str | List[str] | None = None,
115 leeway: float | timedelta = 0,
116 # kwargs
117 **kwargs: Any,
118 ) -> dict[str, Any]:
119 if kwargs:
120 warnings.warn(
121 "passing additional kwargs to decode_complete() is deprecated "
122 "and will be removed in pyjwt version 3. "
123 f"Unsupported kwargs: {tuple(kwargs.keys())}",
124 RemovedInPyjwt3Warning,
125 )
126 options = dict(options or {}) # shallow-copy or initialize an empty dict
127 options.setdefault("verify_signature", True)
129 # If the user has set the legacy `verify` argument, and it doesn't match
130 # what the relevant `options` entry for the argument is, inform the user
131 # that they're likely making a mistake.
132 if verify is not None and verify != options["verify_signature"]:
133 warnings.warn(
134 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
135 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
136 "This invocation has a mismatch between the kwarg and the option entry.",
137 category=DeprecationWarning,
138 )
140 if not options["verify_signature"]:
141 options.setdefault("verify_exp", False)
142 options.setdefault("verify_nbf", False)
143 options.setdefault("verify_iat", False)
144 options.setdefault("verify_aud", False)
145 options.setdefault("verify_iss", False)
147 if options["verify_signature"] and not algorithms:
148 raise DecodeError(
149 'It is required that you pass in a value for the "algorithms" argument when calling decode().'
150 )
152 decoded = api_jws.decode_complete(
153 jwt,
154 key=key,
155 algorithms=algorithms,
156 options=options,
157 detached_payload=detached_payload,
158 )
160 payload = self._decode_payload(decoded)
162 merged_options = {**self.options, **options}
163 self._validate_claims(
164 payload, merged_options, audience=audience, issuer=issuer, leeway=leeway
165 )
167 decoded["payload"] = payload
168 return decoded
170 def _decode_payload(self, decoded: dict[str, Any]) -> Any:
171 """
172 Decode the payload from a JWS dictionary (payload, signature, header).
174 This method is intended to be overridden by subclasses that need to
175 decode the payload in a different way, e.g. decompress compressed
176 payloads.
177 """
178 try:
179 payload = json.loads(decoded["payload"])
180 except ValueError as e:
181 raise DecodeError(f"Invalid payload string: {e}")
182 if not isinstance(payload, dict):
183 raise DecodeError("Invalid payload string: must be a json object")
184 return payload
186 def decode(
187 self,
188 jwt: str | bytes,
189 key: AllowedPublicKeys | PyJWK | str | bytes = "",
190 algorithms: list[str] | None = None,
191 options: dict[str, Any] | None = None,
192 # deprecated arg, remove in pyjwt3
193 verify: bool | None = None,
194 # could be used as passthrough to api_jws, consider removal in pyjwt3
195 detached_payload: bytes | None = None,
196 # passthrough arguments to _validate_claims
197 # consider putting in options
198 audience: str | Iterable[str] | None = None,
199 issuer: str | List[str] | None = None,
200 leeway: float | timedelta = 0,
201 # kwargs
202 **kwargs: Any,
203 ) -> Any:
204 if kwargs:
205 warnings.warn(
206 "passing additional kwargs to decode() is deprecated "
207 "and will be removed in pyjwt version 3. "
208 f"Unsupported kwargs: {tuple(kwargs.keys())}",
209 RemovedInPyjwt3Warning,
210 )
211 decoded = self.decode_complete(
212 jwt,
213 key,
214 algorithms,
215 options,
216 verify=verify,
217 detached_payload=detached_payload,
218 audience=audience,
219 issuer=issuer,
220 leeway=leeway,
221 )
222 return decoded["payload"]
224 def _validate_claims(
225 self,
226 payload: dict[str, Any],
227 options: dict[str, Any],
228 audience=None,
229 issuer=None,
230 leeway: float | timedelta = 0,
231 ) -> None:
232 if isinstance(leeway, timedelta):
233 leeway = leeway.total_seconds()
235 if audience is not None and not isinstance(audience, (str, Iterable)):
236 raise TypeError("audience must be a string, iterable or None")
238 self._validate_required_claims(payload, options)
240 now = datetime.now(tz=timezone.utc).timestamp()
242 if "iat" in payload and options["verify_iat"]:
243 self._validate_iat(payload, now, leeway)
245 if "nbf" in payload and options["verify_nbf"]:
246 self._validate_nbf(payload, now, leeway)
248 if "exp" in payload and options["verify_exp"]:
249 self._validate_exp(payload, now, leeway)
251 if options["verify_iss"]:
252 self._validate_iss(payload, issuer)
254 if options["verify_aud"]:
255 self._validate_aud(
256 payload, audience, strict=options.get("strict_aud", False)
257 )
259 def _validate_required_claims(
260 self,
261 payload: dict[str, Any],
262 options: dict[str, Any],
263 ) -> None:
264 for claim in options["require"]:
265 if payload.get(claim) is None:
266 raise MissingRequiredClaimError(claim)
268 def _validate_iat(
269 self,
270 payload: dict[str, Any],
271 now: float,
272 leeway: float,
273 ) -> None:
274 try:
275 iat = int(payload["iat"])
276 except ValueError:
277 raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.")
278 if iat > (now + leeway):
279 raise ImmatureSignatureError("The token is not yet valid (iat)")
281 def _validate_nbf(
282 self,
283 payload: dict[str, Any],
284 now: float,
285 leeway: float,
286 ) -> None:
287 try:
288 nbf = int(payload["nbf"])
289 except ValueError:
290 raise DecodeError("Not Before claim (nbf) must be an integer.")
292 if nbf > (now + leeway):
293 raise ImmatureSignatureError("The token is not yet valid (nbf)")
295 def _validate_exp(
296 self,
297 payload: dict[str, Any],
298 now: float,
299 leeway: float,
300 ) -> None:
301 try:
302 exp = int(payload["exp"])
303 except ValueError:
304 raise DecodeError("Expiration Time claim (exp) must be an integer.")
306 if exp <= (now - leeway):
307 raise ExpiredSignatureError("Signature has expired")
309 def _validate_aud(
310 self,
311 payload: dict[str, Any],
312 audience: str | Iterable[str] | None,
313 *,
314 strict: bool = False,
315 ) -> None:
316 if audience is None:
317 if "aud" not in payload or not payload["aud"]:
318 return
319 # Application did not specify an audience, but
320 # the token has the 'aud' claim
321 raise InvalidAudienceError("Invalid audience")
323 if "aud" not in payload or not payload["aud"]:
324 # Application specified an audience, but it could not be
325 # verified since the token does not contain a claim.
326 raise MissingRequiredClaimError("aud")
328 audience_claims = payload["aud"]
330 # In strict mode, we forbid list matching: the supplied audience
331 # must be a string, and it must exactly match the audience claim.
332 if strict:
333 # Only a single audience is allowed in strict mode.
334 if not isinstance(audience, str):
335 raise InvalidAudienceError("Invalid audience (strict)")
337 # Only a single audience claim is allowed in strict mode.
338 if not isinstance(audience_claims, str):
339 raise InvalidAudienceError("Invalid claim format in token (strict)")
341 if audience != audience_claims:
342 raise InvalidAudienceError("Audience doesn't match (strict)")
344 return
346 if isinstance(audience_claims, str):
347 audience_claims = [audience_claims]
348 if not isinstance(audience_claims, list):
349 raise InvalidAudienceError("Invalid claim format in token")
350 if any(not isinstance(c, str) for c in audience_claims):
351 raise InvalidAudienceError("Invalid claim format in token")
353 if isinstance(audience, str):
354 audience = [audience]
356 if all(aud not in audience_claims for aud in audience):
357 raise InvalidAudienceError("Audience doesn't match")
359 def _validate_iss(self, payload: dict[str, Any], issuer: Any) -> None:
360 if issuer is None:
361 return
363 if "iss" not in payload:
364 raise MissingRequiredClaimError("iss")
366 if isinstance(issuer, list):
367 if payload["iss"] not in issuer:
368 raise InvalidIssuerError("Invalid issuer")
369 else:
370 if payload["iss"] != issuer:
371 raise InvalidIssuerError("Invalid issuer")
374_jwt_global_obj = PyJWT()
375encode = _jwt_global_obj.encode
376decode_complete = _jwt_global_obj.decode_complete
377decode = _jwt_global_obj.decode