Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1from __future__ import annotations 

2 

3import json 

4import os 

5import warnings 

6from calendar import timegm 

7from collections.abc import Container, Iterable, Sequence 

8from datetime import datetime, timedelta, timezone 

9from typing import TYPE_CHECKING, Any, Union, cast 

10 

11from .api_jws import PyJWS, _ALGORITHM_UNSET, _jws_global_obj 

12from .exceptions import ( 

13 DecodeError, 

14 ExpiredSignatureError, 

15 ImmatureSignatureError, 

16 InvalidAudienceError, 

17 InvalidIssuedAtError, 

18 InvalidIssuerError, 

19 InvalidJTIError, 

20 InvalidSubjectError, 

21 MissingRequiredClaimError, 

22) 

23from .warnings import RemovedInPyjwt3Warning 

24 

25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): 

26 import sys 

27 

28 if sys.version_info >= (3, 10): 

29 from typing import TypeAlias 

30 else: 

31 # Python 3.9 and lower 

32 from typing_extensions import TypeAlias 

33 

34 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

35 from .api_jwk import PyJWK 

36 from .types import FullOptions, Options, SigOptions 

37 

38 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes] 

39 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes] 

40 

41 

42class PyJWT: 

43 def __init__(self, options: Options | None = None) -> None: 

44 self.options: FullOptions 

45 self.options = self._get_default_options() 

46 if options is not None: 

47 self.options = self._merge_options(options) 

48 

49 self._jws = PyJWS(options=self._get_sig_options()) 

50 

51 @staticmethod 

52 def _get_default_options() -> FullOptions: 

53 return { 

54 "verify_signature": True, 

55 "verify_exp": True, 

56 "verify_nbf": True, 

57 "verify_iat": True, 

58 "verify_aud": True, 

59 "verify_iss": True, 

60 "verify_sub": True, 

61 "verify_jti": True, 

62 "require": [], 

63 "strict_aud": False, 

64 "enforce_minimum_key_length": False, 

65 } 

66 

67 def _get_sig_options(self) -> SigOptions: 

68 return { 

69 "verify_signature": self.options["verify_signature"], 

70 "enforce_minimum_key_length": self.options.get( 

71 "enforce_minimum_key_length", False 

72 ), 

73 } 

74 

75 def _merge_options(self, options: Options | None = None) -> FullOptions: 

76 if options is None: 

77 return self.options 

78 

79 # (defensive) set defaults for verify_x to False if verify_signature is False 

80 if not options.get("verify_signature", True): 

81 options["verify_exp"] = options.get("verify_exp", False) 

82 options["verify_nbf"] = options.get("verify_nbf", False) 

83 options["verify_iat"] = options.get("verify_iat", False) 

84 options["verify_aud"] = options.get("verify_aud", False) 

85 options["verify_iss"] = options.get("verify_iss", False) 

86 options["verify_sub"] = options.get("verify_sub", False) 

87 options["verify_jti"] = options.get("verify_jti", False) 

88 return {**self.options, **options} 

89 

90 def encode( 

91 self, 

92 payload: dict[str, Any], 

93 key: AllowedPrivateKeyTypes, 

94 algorithm: str | None = _ALGORITHM_UNSET, # type: ignore[assignment] 

95 headers: dict[str, Any] | None = None, 

96 json_encoder: type[json.JSONEncoder] | None = None, 

97 sort_headers: bool = True, 

98 ) -> str: 

99 """Encode the ``payload`` as JSON Web Token. 

100 

101 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` 

102 :type payload: dict[str, typing.Any] 

103 :param key: a key suitable for the chosen algorithm: 

104 

105 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string 

106 * for **symmetric algorithms**: plain string, sufficiently long for security 

107 

108 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` 

109 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. 

110 If ``headers`` includes ``alg``, it will be preferred to this parameter. 

111 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. 

112 :type algorithm: str or None 

113 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. 

114 :type headers: dict[str, typing.Any] or None 

115 :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` 

116 :type json_encoder: json.JSONEncoder or None 

117 

118 :rtype: str 

119 :returns: a JSON Web Token 

120 

121 :raises TypeError: if ``payload`` is not a ``dict`` 

122 """ 

123 # Check that we get a dict 

124 if not isinstance(payload, dict): 

125 raise TypeError( 

126 "Expecting a dict object, as JWT only supports " 

127 "JSON objects as payloads." 

128 ) 

129 

130 # Payload 

131 payload = payload.copy() 

132 for time_claim in ["exp", "iat", "nbf"]: 

133 # Convert datetime to a intDate value in known time-format claims 

134 if isinstance(payload.get(time_claim), datetime): 

135 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

136 

137 # Issue #1039, iss being set to non-string 

138 if "iss" in payload and not isinstance(payload["iss"], str): 

139 raise TypeError("Issuer (iss) must be a string.") 

140 

141 json_payload = self._encode_payload( 

142 payload, 

143 headers=headers, 

144 json_encoder=json_encoder, 

145 ) 

146 

147 return self._jws.encode( 

148 json_payload, 

149 key, 

150 algorithm, 

151 headers, 

152 json_encoder, 

153 sort_headers=sort_headers, 

154 ) 

155 

156 def _encode_payload( 

157 self, 

158 payload: dict[str, Any], 

159 headers: dict[str, Any] | None = None, 

160 json_encoder: type[json.JSONEncoder] | None = None, 

161 ) -> bytes: 

162 """ 

163 Encode a given payload to the bytes to be signed. 

164 

165 This method is intended to be overridden by subclasses that need to 

166 encode the payload in a different way, e.g. compress the payload. 

167 """ 

168 return json.dumps( 

169 payload, 

170 separators=(",", ":"), 

171 cls=json_encoder, 

172 ).encode("utf-8") 

173 

174 def decode_complete( 

175 self, 

176 jwt: str | bytes, 

177 key: AllowedPublicKeyTypes = "", 

178 algorithms: Sequence[str] | None = None, 

179 options: Options | None = None, 

180 # deprecated arg, remove in pyjwt3 

181 verify: bool | None = None, 

182 # could be used as passthrough to api_jws, consider removal in pyjwt3 

183 detached_payload: bytes | None = None, 

184 # passthrough arguments to _validate_claims 

185 # consider putting in options 

186 audience: str | Iterable[str] | None = None, 

187 issuer: str | Container[str] | None = None, 

188 subject: str | None = None, 

189 leeway: float | timedelta = 0, 

190 # kwargs 

191 **kwargs: Any, 

192 ) -> dict[str, Any]: 

193 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), 

194 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", 

195 and "signature" respectively. 

196 

197 :param jwt: the token to be decoded 

198 :type jwt: str or bytes 

199 :param key: the key suitable for the allowed algorithm 

200 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

201 

202 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

203 

204 .. warning:: 

205 

206 Do **not** compute the ``algorithms`` parameter based on 

207 the ``alg`` from the token itself, or on any other data 

208 that an attacker may be able to influence, as that might 

209 expose you to various vulnerabilities (see `RFC 8725 §2.1 

210 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

211 either hard-code a fixed value for ``algorithms``, or 

212 configure it in the same place you configure the 

213 ``key``. Make sure not to mix symmetric and asymmetric 

214 algorithms that interpret the ``key`` in different ways 

215 (e.g. HS\\* and RS\\*). 

216 :type algorithms: typing.Sequence[str] or None 

217 

218 :param jwt.types.Options options: extended decoding and validation options 

219 Refer to :py:class:`jwt.types.Options` for more information. 

220 

221 :param audience: optional, the value for ``verify_aud`` check 

222 :type audience: str or typing.Iterable[str] or None 

223 :param issuer: optional, the value for ``verify_iss`` check 

224 :type issuer: str or typing.Container[str] or None 

225 :param leeway: a time margin in seconds for the expiration check 

226 :type leeway: float or datetime.timedelta 

227 :rtype: dict[str, typing.Any] 

228 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS 

229 Payload on the key ``payload``, and the JWS Signature on the key ``signature``. 

230 """ 

231 if kwargs: 

232 warnings.warn( 

233 "passing additional kwargs to decode_complete() is deprecated " 

234 "and will be removed in pyjwt version 3. " 

235 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

236 RemovedInPyjwt3Warning, 

237 stacklevel=2, 

238 ) 

239 

240 if options is None: 

241 verify_signature = True 

242 else: 

243 verify_signature = options.get("verify_signature", True) 

244 

245 # If the user has set the legacy `verify` argument, and it doesn't match 

246 # what the relevant `options` entry for the argument is, inform the user 

247 # that they're likely making a mistake. 

248 if verify is not None and verify != verify_signature: 

249 warnings.warn( 

250 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

251 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

252 "This invocation has a mismatch between the kwarg and the option entry.", 

253 category=DeprecationWarning, 

254 stacklevel=2, 

255 ) 

256 

257 merged_options = self._merge_options(options) 

258 

259 sig_options: SigOptions = { 

260 "verify_signature": verify_signature, 

261 } 

262 decoded = self._jws.decode_complete( 

263 jwt, 

264 key=key, 

265 algorithms=algorithms, 

266 options=sig_options, 

267 detached_payload=detached_payload, 

268 ) 

269 

270 payload = self._decode_payload(decoded) 

271 

272 self._validate_claims( 

273 payload, 

274 merged_options, 

275 audience=audience, 

276 issuer=issuer, 

277 leeway=leeway, 

278 subject=subject, 

279 ) 

280 

281 decoded["payload"] = payload 

282 return decoded 

283 

284 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: 

285 """ 

286 Decode the payload from a JWS dictionary (payload, signature, header). 

287 

288 This method is intended to be overridden by subclasses that need to 

289 decode the payload in a different way, e.g. decompress compressed 

290 payloads. 

291 """ 

292 try: 

293 payload: dict[str, Any] = json.loads(decoded["payload"]) 

294 except ValueError as e: 

295 raise DecodeError(f"Invalid payload string: {e}") from e 

296 if not isinstance(payload, dict): 

297 raise DecodeError("Invalid payload string: must be a json object") 

298 return payload 

299 

300 def decode( 

301 self, 

302 jwt: str | bytes, 

303 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

304 algorithms: Sequence[str] | None = None, 

305 options: Options | None = None, 

306 # deprecated arg, remove in pyjwt3 

307 verify: bool | None = None, 

308 # could be used as passthrough to api_jws, consider removal in pyjwt3 

309 detached_payload: bytes | None = None, 

310 # passthrough arguments to _validate_claims 

311 # consider putting in options 

312 audience: str | Iterable[str] | None = None, 

313 subject: str | None = None, 

314 issuer: str | Container[str] | None = None, 

315 leeway: float | timedelta = 0, 

316 # kwargs 

317 **kwargs: Any, 

318 ) -> dict[str, Any]: 

319 """Verify the ``jwt`` token signature and return the token claims. 

320 

321 :param jwt: the token to be decoded 

322 :type jwt: str or bytes 

323 :param key: the key suitable for the allowed algorithm 

324 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

325 

326 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

327 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. 

328 

329 .. warning:: 

330 

331 Do **not** compute the ``algorithms`` parameter based on 

332 the ``alg`` from the token itself, or on any other data 

333 that an attacker may be able to influence, as that might 

334 expose you to various vulnerabilities (see `RFC 8725 §2.1 

335 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

336 either hard-code a fixed value for ``algorithms``, or 

337 configure it in the same place you configure the 

338 ``key``. Make sure not to mix symmetric and asymmetric 

339 algorithms that interpret the ``key`` in different ways 

340 (e.g. HS\\* and RS\\*). 

341 :type algorithms: typing.Sequence[str] or None 

342 

343 :param jwt.types.Options options: extended decoding and validation options 

344 Refer to :py:class:`jwt.types.Options` for more information. 

345 

346 :param audience: optional, the value for ``verify_aud`` check 

347 :type audience: str or typing.Iterable[str] or None 

348 :param subject: optional, the value for ``verify_sub`` check 

349 :type subject: str or None 

350 :param issuer: optional, the value for ``verify_iss`` check 

351 :type issuer: str or typing.Container[str] or None 

352 :param leeway: a time margin in seconds for the expiration check 

353 :type leeway: float or datetime.timedelta 

354 :rtype: dict[str, typing.Any] 

355 :returns: the JWT claims 

356 """ 

357 if kwargs: 

358 warnings.warn( 

359 "passing additional kwargs to decode() is deprecated " 

360 "and will be removed in pyjwt version 3. " 

361 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

362 RemovedInPyjwt3Warning, 

363 stacklevel=2, 

364 ) 

365 decoded = self.decode_complete( 

366 jwt, 

367 key, 

368 algorithms, 

369 options, 

370 verify=verify, 

371 detached_payload=detached_payload, 

372 audience=audience, 

373 subject=subject, 

374 issuer=issuer, 

375 leeway=leeway, 

376 ) 

377 return cast(dict[str, Any], decoded["payload"]) 

378 

379 def _validate_claims( 

380 self, 

381 payload: dict[str, Any], 

382 options: FullOptions, 

383 audience: Iterable[str] | str | None = None, 

384 issuer: Container[str] | str | None = None, 

385 subject: str | None = None, 

386 leeway: float | timedelta = 0, 

387 ) -> None: 

388 if isinstance(leeway, timedelta): 

389 leeway = leeway.total_seconds() 

390 

391 if audience is not None and not isinstance(audience, (str, Iterable)): 

392 raise TypeError("audience must be a string, iterable or None") 

393 

394 self._validate_required_claims(payload, options["require"]) 

395 

396 now = datetime.now(tz=timezone.utc).timestamp() 

397 

398 if "iat" in payload and options["verify_iat"]: 

399 self._validate_iat(payload, now, leeway) 

400 

401 if "nbf" in payload and options["verify_nbf"]: 

402 self._validate_nbf(payload, now, leeway) 

403 

404 if "exp" in payload and options["verify_exp"]: 

405 self._validate_exp(payload, now, leeway) 

406 

407 if options["verify_iss"]: 

408 self._validate_iss(payload, issuer) 

409 

410 if options["verify_aud"]: 

411 self._validate_aud( 

412 payload, audience, strict=options.get("strict_aud", False) 

413 ) 

414 

415 if options["verify_sub"]: 

416 self._validate_sub(payload, subject) 

417 

418 if options["verify_jti"]: 

419 self._validate_jti(payload) 

420 

421 def _validate_required_claims( 

422 self, 

423 payload: dict[str, Any], 

424 claims: Iterable[str], 

425 ) -> None: 

426 for claim in claims: 

427 if payload.get(claim) is None: 

428 raise MissingRequiredClaimError(claim) 

429 

430 def _validate_sub( 

431 self, payload: dict[str, Any], subject: str | None = None 

432 ) -> None: 

433 """ 

434 Checks whether "sub" if in the payload is valid or not. 

435 This is an Optional claim 

436 

437 :param payload(dict): The payload which needs to be validated 

438 :param subject(str): The subject of the token 

439 """ 

440 

441 if "sub" not in payload: 

442 return 

443 

444 if not isinstance(payload["sub"], str): 

445 raise InvalidSubjectError("Subject must be a string") 

446 

447 if subject is not None: 

448 if payload.get("sub") != subject: 

449 raise InvalidSubjectError("Invalid subject") 

450 

451 def _validate_jti(self, payload: dict[str, Any]) -> None: 

452 """ 

453 Checks whether "jti" if in the payload is valid or not 

454 This is an Optional claim 

455 

456 :param payload(dict): The payload which needs to be validated 

457 """ 

458 

459 if "jti" not in payload: 

460 return 

461 

462 if not isinstance(payload.get("jti"), str): 

463 raise InvalidJTIError("JWT ID must be a string") 

464 

465 def _validate_iat( 

466 self, 

467 payload: dict[str, Any], 

468 now: float, 

469 leeway: float, 

470 ) -> None: 

471 try: 

472 iat = int(payload["iat"]) 

473 except ValueError: 

474 raise InvalidIssuedAtError( 

475 "Issued At claim (iat) must be an integer." 

476 ) from None 

477 if iat > (now + leeway): 

478 raise ImmatureSignatureError("The token is not yet valid (iat)") 

479 

480 def _validate_nbf( 

481 self, 

482 payload: dict[str, Any], 

483 now: float, 

484 leeway: float, 

485 ) -> None: 

486 try: 

487 nbf = int(payload["nbf"]) 

488 except ValueError: 

489 raise DecodeError("Not Before claim (nbf) must be an integer.") from None 

490 

491 if nbf > (now + leeway): 

492 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

493 

494 def _validate_exp( 

495 self, 

496 payload: dict[str, Any], 

497 now: float, 

498 leeway: float, 

499 ) -> None: 

500 try: 

501 exp = int(payload["exp"]) 

502 except ValueError: 

503 raise DecodeError( 

504 "Expiration Time claim (exp) must be an integer." 

505 ) from None 

506 

507 if exp <= (now - leeway): 

508 raise ExpiredSignatureError("Signature has expired") 

509 

510 def _validate_aud( 

511 self, 

512 payload: dict[str, Any], 

513 audience: str | Iterable[str] | None, 

514 *, 

515 strict: bool = False, 

516 ) -> None: 

517 if audience is None: 

518 if "aud" not in payload or not payload["aud"]: 

519 return 

520 # Application did not specify an audience, but 

521 # the token has the 'aud' claim 

522 raise InvalidAudienceError("Invalid audience") 

523 

524 if "aud" not in payload or not payload["aud"]: 

525 # Application specified an audience, but it could not be 

526 # verified since the token does not contain a claim. 

527 raise MissingRequiredClaimError("aud") 

528 

529 audience_claims = payload["aud"] 

530 

531 # In strict mode, we forbid list matching: the supplied audience 

532 # must be a string, and it must exactly match the audience claim. 

533 if strict: 

534 # Only a single audience is allowed in strict mode. 

535 if not isinstance(audience, str): 

536 raise InvalidAudienceError("Invalid audience (strict)") 

537 

538 # Only a single audience claim is allowed in strict mode. 

539 if not isinstance(audience_claims, str): 

540 raise InvalidAudienceError("Invalid claim format in token (strict)") 

541 

542 if audience != audience_claims: 

543 raise InvalidAudienceError("Audience doesn't match (strict)") 

544 

545 return 

546 

547 if isinstance(audience_claims, str): 

548 audience_claims = [audience_claims] 

549 if not isinstance(audience_claims, list): 

550 raise InvalidAudienceError("Invalid claim format in token") 

551 if any(not isinstance(c, str) for c in audience_claims): 

552 raise InvalidAudienceError("Invalid claim format in token") 

553 

554 if isinstance(audience, str): 

555 audience = [audience] 

556 

557 if all(aud not in audience_claims for aud in audience): 

558 raise InvalidAudienceError("Audience doesn't match") 

559 

560 def _validate_iss( 

561 self, payload: dict[str, Any], issuer: Container[str] | str | None 

562 ) -> None: 

563 if issuer is None: 

564 return 

565 

566 if "iss" not in payload: 

567 raise MissingRequiredClaimError("iss") 

568 

569 iss = payload["iss"] 

570 if not isinstance(iss, str): 

571 raise InvalidIssuerError("Payload Issuer (iss) must be a string") 

572 

573 if isinstance(issuer, str): 

574 if iss != issuer: 

575 raise InvalidIssuerError("Invalid issuer") 

576 else: 

577 try: 

578 if iss not in issuer: 

579 raise InvalidIssuerError("Invalid issuer") 

580 except TypeError: 

581 raise InvalidIssuerError( 

582 'Issuer param must be "str" or "Container[str]"' 

583 ) from None 

584 

585 

586_jwt_global_obj = PyJWT() 

587_jwt_global_obj._jws = _jws_global_obj 

588encode = _jwt_global_obj.encode 

589decode_complete = _jwt_global_obj.decode_complete 

590decode = _jwt_global_obj.decode