Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1from __future__ import annotations 

2 

3import json 

4import os 

5import warnings 

6from calendar import timegm 

7from collections.abc import Container, Iterable, Sequence 

8from datetime import datetime, timedelta, timezone 

9from typing import TYPE_CHECKING, Any, Union, cast 

10 

11from .api_jws import PyJWS, _ALGORITHM_UNSET, _jws_global_obj 

12from .exceptions import ( 

13 DecodeError, 

14 ExpiredSignatureError, 

15 ImmatureSignatureError, 

16 InvalidAudienceError, 

17 InvalidIssuedAtError, 

18 InvalidIssuerError, 

19 InvalidJTIError, 

20 InvalidSubjectError, 

21 MissingRequiredClaimError, 

22) 

23from .warnings import RemovedInPyjwt3Warning 

24 

25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): 

26 import sys 

27 

28 if sys.version_info >= (3, 10): 

29 from typing import TypeAlias 

30 else: 

31 # Python 3.9 and lower 

32 from typing_extensions import TypeAlias 

33 

34 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

35 from .api_jwk import PyJWK 

36 from .types import FullOptions, Options, SigOptions 

37 

38 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes] 

39 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes] 

40 

41 

42class PyJWT: 

43 def __init__(self, options: Options | None = None) -> None: 

44 self.options: FullOptions 

45 self.options = self._get_default_options() 

46 if options is not None: 

47 self.options = self._merge_options(options) 

48 

49 self._jws = PyJWS(options=self._get_sig_options()) 

50 

51 @staticmethod 

52 def _get_default_options() -> FullOptions: 

53 return { 

54 "verify_signature": True, 

55 "verify_exp": True, 

56 "verify_nbf": True, 

57 "verify_iat": True, 

58 "verify_aud": True, 

59 "verify_iss": True, 

60 "verify_sub": True, 

61 "verify_jti": True, 

62 "require": [], 

63 "strict_aud": False, 

64 "enforce_minimum_key_length": False, 

65 } 

66 

67 def _get_sig_options(self) -> SigOptions: 

68 return { 

69 "verify_signature": self.options["verify_signature"], 

70 "enforce_minimum_key_length": self.options.get( 

71 "enforce_minimum_key_length", False 

72 ), 

73 } 

74 

75 def _merge_options(self, options: Options | None = None) -> FullOptions: 

76 if options is None: 

77 return self.options 

78 

79 # (defensive) set defaults for verify_x to False if verify_signature is False 

80 if not options.get("verify_signature", True): 

81 options["verify_exp"] = options.get("verify_exp", False) 

82 options["verify_nbf"] = options.get("verify_nbf", False) 

83 options["verify_iat"] = options.get("verify_iat", False) 

84 options["verify_aud"] = options.get("verify_aud", False) 

85 options["verify_iss"] = options.get("verify_iss", False) 

86 options["verify_sub"] = options.get("verify_sub", False) 

87 options["verify_jti"] = options.get("verify_jti", False) 

88 return {**self.options, **options} 

89 

90 def encode( 

91 self, 

92 payload: dict[str, Any], 

93 key: AllowedPrivateKeyTypes, 

94 algorithm: str | None = _ALGORITHM_UNSET, # type: ignore[assignment] 

95 headers: dict[str, Any] | None = None, 

96 json_encoder: type[json.JSONEncoder] | None = None, 

97 sort_headers: bool = True, 

98 ) -> str: 

99 """Encode the ``payload`` as JSON Web Token. 

100 

101 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` 

102 :type payload: dict[str, typing.Any] 

103 :param key: a key suitable for the chosen algorithm: 

104 

105 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string 

106 * for **symmetric algorithms**: plain string, sufficiently long for security 

107 

108 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` 

109 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. 

110 If ``headers`` includes ``alg``, it will be preferred to this parameter. 

111 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. 

112 :type algorithm: str or None 

113 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. 

114 :type headers: dict[str, typing.Any] or None 

115 :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` 

116 :type json_encoder: json.JSONEncoder or None 

117 

118 :rtype: str 

119 :returns: a JSON Web Token 

120 

121 :raises TypeError: if ``payload`` is not a ``dict`` 

122 """ 

123 # Check that we get a dict 

124 if not isinstance(payload, dict): 

125 raise TypeError( 

126 "Expecting a dict object, as JWT only supports " 

127 "JSON objects as payloads." 

128 ) 

129 

130 # Payload 

131 payload = payload.copy() 

132 for time_claim in ["exp", "iat", "nbf"]: 

133 # Convert datetime to a intDate value in known time-format claims 

134 if isinstance(payload.get(time_claim), datetime): 

135 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

136 

137 # Issue #1039, iss being set to non-string 

138 if "iss" in payload and not isinstance(payload["iss"], str): 

139 raise TypeError("Issuer (iss) must be a string.") 

140 

141 json_payload = self._encode_payload( 

142 payload, 

143 headers=headers, 

144 json_encoder=json_encoder, 

145 ) 

146 

147 return self._jws.encode( 

148 json_payload, 

149 key, 

150 algorithm, 

151 headers, 

152 json_encoder, 

153 sort_headers=sort_headers, 

154 ) 

155 

156 def _encode_payload( 

157 self, 

158 payload: dict[str, Any], 

159 headers: dict[str, Any] | None = None, 

160 json_encoder: type[json.JSONEncoder] | None = None, 

161 ) -> bytes: 

162 """ 

163 Encode a given payload to the bytes to be signed. 

164 

165 This method is intended to be overridden by subclasses that need to 

166 encode the payload in a different way, e.g. compress the payload. 

167 """ 

168 return json.dumps( 

169 payload, 

170 separators=(",", ":"), 

171 cls=json_encoder, 

172 ).encode("utf-8") 

173 

174 def decode_complete( 

175 self, 

176 jwt: str | bytes, 

177 key: AllowedPublicKeyTypes = "", 

178 algorithms: Sequence[str] | None = None, 

179 options: Options | None = None, 

180 # deprecated arg, remove in pyjwt3 

181 verify: bool | None = None, 

182 # could be used as passthrough to api_jws, consider removal in pyjwt3 

183 detached_payload: bytes | None = None, 

184 # passthrough arguments to _validate_claims 

185 # consider putting in options 

186 audience: str | Iterable[str] | None = None, 

187 issuer: str | Container[str] | None = None, 

188 subject: str | None = None, 

189 leeway: float | timedelta = 0, 

190 # kwargs 

191 **kwargs: Any, 

192 ) -> dict[str, Any]: 

193 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), 

194 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", 

195 and "signature" respectively. 

196 

197 :param jwt: the token to be decoded 

198 :type jwt: str or bytes 

199 :param key: the key suitable for the allowed algorithm 

200 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

201 

202 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

203 

204 .. warning:: 

205 

206 Do **not** compute the ``algorithms`` parameter based on 

207 the ``alg`` from the token itself, or on any other data 

208 that an attacker may be able to influence, as that might 

209 expose you to various vulnerabilities (see `RFC 8725 §2.1 

210 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

211 either hard-code a fixed value for ``algorithms``, or 

212 configure it in the same place you configure the 

213 ``key``. Make sure not to mix symmetric and asymmetric 

214 algorithms that interpret the ``key`` in different ways 

215 (e.g. HS\\* and RS\\*). 

216 :type algorithms: typing.Sequence[str] or None 

217 

218 :param jwt.types.Options options: extended decoding and validation options 

219 Refer to :py:class:`jwt.types.Options` for more information. 

220 

221 :param audience: optional, the value for ``verify_aud`` check 

222 :type audience: str or typing.Iterable[str] or None 

223 :param issuer: optional, the value for ``verify_iss`` check 

224 :type issuer: str or typing.Container[str] or None 

225 :param leeway: a time margin in seconds for the expiration check 

226 :type leeway: float or datetime.timedelta 

227 :rtype: dict[str, typing.Any] 

228 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS 

229 Payload on the key ``payload``, and the JWS Signature on the key ``signature``. 

230 """ 

231 if kwargs: 

232 warnings.warn( 

233 "passing additional kwargs to decode_complete() is deprecated " 

234 "and will be removed in pyjwt version 3. " 

235 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

236 RemovedInPyjwt3Warning, 

237 stacklevel=2, 

238 ) 

239 

240 if options is None: 

241 verify_signature = True 

242 else: 

243 verify_signature = options.get("verify_signature", True) 

244 

245 # If the user has set the legacy `verify` argument, and it doesn't match 

246 # what the relevant `options` entry for the argument is, inform the user 

247 # that they're likely making a mistake. 

248 if verify is not None and verify != verify_signature: 

249 warnings.warn( 

250 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

251 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

252 "This invocation has a mismatch between the kwarg and the option entry.", 

253 category=DeprecationWarning, 

254 stacklevel=2, 

255 ) 

256 

257 merged_options = self._merge_options(options) 

258 

259 sig_options: SigOptions = { 

260 "verify_signature": verify_signature, 

261 "enforce_minimum_key_length": merged_options.get( 

262 "enforce_minimum_key_length", False 

263 ), 

264 } 

265 decoded = self._jws.decode_complete( 

266 jwt, 

267 key=key, 

268 algorithms=algorithms, 

269 options=sig_options, 

270 detached_payload=detached_payload, 

271 ) 

272 

273 payload = self._decode_payload(decoded) 

274 

275 self._validate_claims( 

276 payload, 

277 merged_options, 

278 audience=audience, 

279 issuer=issuer, 

280 leeway=leeway, 

281 subject=subject, 

282 ) 

283 

284 decoded["payload"] = payload 

285 return decoded 

286 

287 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: 

288 """ 

289 Decode the payload from a JWS dictionary (payload, signature, header). 

290 

291 This method is intended to be overridden by subclasses that need to 

292 decode the payload in a different way, e.g. decompress compressed 

293 payloads. 

294 """ 

295 try: 

296 payload: dict[str, Any] = json.loads(decoded["payload"]) 

297 except ValueError as e: 

298 raise DecodeError(f"Invalid payload string: {e}") from e 

299 if not isinstance(payload, dict): 

300 raise DecodeError("Invalid payload string: must be a json object") 

301 return payload 

302 

303 def decode( 

304 self, 

305 jwt: str | bytes, 

306 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

307 algorithms: Sequence[str] | None = None, 

308 options: Options | None = None, 

309 # deprecated arg, remove in pyjwt3 

310 verify: bool | None = None, 

311 # could be used as passthrough to api_jws, consider removal in pyjwt3 

312 detached_payload: bytes | None = None, 

313 # passthrough arguments to _validate_claims 

314 # consider putting in options 

315 audience: str | Iterable[str] | None = None, 

316 subject: str | None = None, 

317 issuer: str | Container[str] | None = None, 

318 leeway: float | timedelta = 0, 

319 # kwargs 

320 **kwargs: Any, 

321 ) -> dict[str, Any]: 

322 """Verify the ``jwt`` token signature and return the token claims. 

323 

324 :param jwt: the token to be decoded 

325 :type jwt: str or bytes 

326 :param key: the key suitable for the allowed algorithm 

327 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

328 

329 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

330 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. 

331 

332 .. warning:: 

333 

334 Do **not** compute the ``algorithms`` parameter based on 

335 the ``alg`` from the token itself, or on any other data 

336 that an attacker may be able to influence, as that might 

337 expose you to various vulnerabilities (see `RFC 8725 §2.1 

338 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

339 either hard-code a fixed value for ``algorithms``, or 

340 configure it in the same place you configure the 

341 ``key``. Make sure not to mix symmetric and asymmetric 

342 algorithms that interpret the ``key`` in different ways 

343 (e.g. HS\\* and RS\\*). 

344 :type algorithms: typing.Sequence[str] or None 

345 

346 :param jwt.types.Options options: extended decoding and validation options 

347 Refer to :py:class:`jwt.types.Options` for more information. 

348 

349 :param audience: optional, the value for ``verify_aud`` check 

350 :type audience: str or typing.Iterable[str] or None 

351 :param subject: optional, the value for ``verify_sub`` check 

352 :type subject: str or None 

353 :param issuer: optional, the value for ``verify_iss`` check 

354 :type issuer: str or typing.Container[str] or None 

355 :param leeway: a time margin in seconds for the expiration check 

356 :type leeway: float or datetime.timedelta 

357 :rtype: dict[str, typing.Any] 

358 :returns: the JWT claims 

359 """ 

360 if kwargs: 

361 warnings.warn( 

362 "passing additional kwargs to decode() is deprecated " 

363 "and will be removed in pyjwt version 3. " 

364 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

365 RemovedInPyjwt3Warning, 

366 stacklevel=2, 

367 ) 

368 decoded = self.decode_complete( 

369 jwt, 

370 key, 

371 algorithms, 

372 options, 

373 verify=verify, 

374 detached_payload=detached_payload, 

375 audience=audience, 

376 subject=subject, 

377 issuer=issuer, 

378 leeway=leeway, 

379 ) 

380 return cast(dict[str, Any], decoded["payload"]) 

381 

382 def _validate_claims( 

383 self, 

384 payload: dict[str, Any], 

385 options: FullOptions, 

386 audience: Iterable[str] | str | None = None, 

387 issuer: Container[str] | str | None = None, 

388 subject: str | None = None, 

389 leeway: float | timedelta = 0, 

390 ) -> None: 

391 if isinstance(leeway, timedelta): 

392 leeway = leeway.total_seconds() 

393 

394 if audience is not None and not isinstance(audience, (str, Iterable)): 

395 raise TypeError("audience must be a string, iterable or None") 

396 

397 self._validate_required_claims(payload, options["require"]) 

398 

399 now = datetime.now(tz=timezone.utc).timestamp() 

400 

401 if "iat" in payload and options["verify_iat"]: 

402 self._validate_iat(payload, now, leeway) 

403 

404 if "nbf" in payload and options["verify_nbf"]: 

405 self._validate_nbf(payload, now, leeway) 

406 

407 if "exp" in payload and options["verify_exp"]: 

408 self._validate_exp(payload, now, leeway) 

409 

410 if options["verify_iss"]: 

411 self._validate_iss(payload, issuer) 

412 

413 if options["verify_aud"]: 

414 self._validate_aud( 

415 payload, audience, strict=options.get("strict_aud", False) 

416 ) 

417 

418 if options["verify_sub"]: 

419 self._validate_sub(payload, subject) 

420 

421 if options["verify_jti"]: 

422 self._validate_jti(payload) 

423 

424 def _validate_required_claims( 

425 self, 

426 payload: dict[str, Any], 

427 claims: Iterable[str], 

428 ) -> None: 

429 for claim in claims: 

430 if payload.get(claim) is None: 

431 raise MissingRequiredClaimError(claim) 

432 

433 def _validate_sub( 

434 self, payload: dict[str, Any], subject: str | None = None 

435 ) -> None: 

436 """ 

437 Checks whether "sub" if in the payload is valid or not. 

438 This is an Optional claim 

439 

440 :param payload(dict): The payload which needs to be validated 

441 :param subject(str): The subject of the token 

442 """ 

443 

444 if "sub" not in payload: 

445 return 

446 

447 if not isinstance(payload["sub"], str): 

448 raise InvalidSubjectError("Subject must be a string") 

449 

450 if subject is not None: 

451 if payload.get("sub") != subject: 

452 raise InvalidSubjectError("Invalid subject") 

453 

454 def _validate_jti(self, payload: dict[str, Any]) -> None: 

455 """ 

456 Checks whether "jti" if in the payload is valid or not 

457 This is an Optional claim 

458 

459 :param payload(dict): The payload which needs to be validated 

460 """ 

461 

462 if "jti" not in payload: 

463 return 

464 

465 if not isinstance(payload.get("jti"), str): 

466 raise InvalidJTIError("JWT ID must be a string") 

467 

468 def _validate_iat( 

469 self, 

470 payload: dict[str, Any], 

471 now: float, 

472 leeway: float, 

473 ) -> None: 

474 try: 

475 iat = int(payload["iat"]) 

476 except ValueError: 

477 raise InvalidIssuedAtError( 

478 "Issued At claim (iat) must be an integer." 

479 ) from None 

480 if iat > (now + leeway): 

481 raise ImmatureSignatureError("The token is not yet valid (iat)") 

482 

483 def _validate_nbf( 

484 self, 

485 payload: dict[str, Any], 

486 now: float, 

487 leeway: float, 

488 ) -> None: 

489 try: 

490 nbf = int(payload["nbf"]) 

491 except ValueError: 

492 raise DecodeError("Not Before claim (nbf) must be an integer.") from None 

493 

494 if nbf > (now + leeway): 

495 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

496 

497 def _validate_exp( 

498 self, 

499 payload: dict[str, Any], 

500 now: float, 

501 leeway: float, 

502 ) -> None: 

503 try: 

504 exp = int(payload["exp"]) 

505 except ValueError: 

506 raise DecodeError( 

507 "Expiration Time claim (exp) must be an integer." 

508 ) from None 

509 

510 if exp <= (now - leeway): 

511 raise ExpiredSignatureError("Signature has expired") 

512 

513 def _validate_aud( 

514 self, 

515 payload: dict[str, Any], 

516 audience: str | Iterable[str] | None, 

517 *, 

518 strict: bool = False, 

519 ) -> None: 

520 if audience is None: 

521 if "aud" not in payload or not payload["aud"]: 

522 return 

523 # Application did not specify an audience, but 

524 # the token has the 'aud' claim 

525 raise InvalidAudienceError("Invalid audience") 

526 

527 if "aud" not in payload or not payload["aud"]: 

528 # Application specified an audience, but it could not be 

529 # verified since the token does not contain a claim. 

530 raise MissingRequiredClaimError("aud") 

531 

532 audience_claims = payload["aud"] 

533 

534 # In strict mode, we forbid list matching: the supplied audience 

535 # must be a string, and it must exactly match the audience claim. 

536 if strict: 

537 # Only a single audience is allowed in strict mode. 

538 if not isinstance(audience, str): 

539 raise InvalidAudienceError("Invalid audience (strict)") 

540 

541 # Only a single audience claim is allowed in strict mode. 

542 if not isinstance(audience_claims, str): 

543 raise InvalidAudienceError("Invalid claim format in token (strict)") 

544 

545 if audience != audience_claims: 

546 raise InvalidAudienceError("Audience doesn't match (strict)") 

547 

548 return 

549 

550 if isinstance(audience_claims, str): 

551 audience_claims = [audience_claims] 

552 if not isinstance(audience_claims, list): 

553 raise InvalidAudienceError("Invalid claim format in token") 

554 if any(not isinstance(c, str) for c in audience_claims): 

555 raise InvalidAudienceError("Invalid claim format in token") 

556 

557 if isinstance(audience, str): 

558 audience = [audience] 

559 

560 if all(aud not in audience_claims for aud in audience): 

561 raise InvalidAudienceError("Audience doesn't match") 

562 

563 def _validate_iss( 

564 self, payload: dict[str, Any], issuer: Container[str] | str | None 

565 ) -> None: 

566 if issuer is None: 

567 return 

568 

569 if "iss" not in payload: 

570 raise MissingRequiredClaimError("iss") 

571 

572 iss = payload["iss"] 

573 if not isinstance(iss, str): 

574 raise InvalidIssuerError("Payload Issuer (iss) must be a string") 

575 

576 if isinstance(issuer, str): 

577 if iss != issuer: 

578 raise InvalidIssuerError("Invalid issuer") 

579 else: 

580 try: 

581 if iss not in issuer: 

582 raise InvalidIssuerError("Invalid issuer") 

583 except TypeError: 

584 raise InvalidIssuerError( 

585 'Issuer param must be "str" or "Container[str]"' 

586 ) from None 

587 

588 

589_jwt_global_obj = PyJWT() 

590_jwt_global_obj._jws = _jws_global_obj 

591encode = _jwt_global_obj.encode 

592decode_complete = _jwt_global_obj.decode_complete 

593decode = _jwt_global_obj.decode