Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

196 statements  

1from __future__ import annotations 

2 

3import json 

4import os 

5import warnings 

6from calendar import timegm 

7from collections.abc import Container, Iterable, Sequence 

8from datetime import datetime, timedelta, timezone 

9from typing import TYPE_CHECKING, Any, Union, cast 

10 

11from .api_jws import PyJWS, _jws_global_obj 

12from .exceptions import ( 

13 DecodeError, 

14 ExpiredSignatureError, 

15 ImmatureSignatureError, 

16 InvalidAudienceError, 

17 InvalidIssuedAtError, 

18 InvalidIssuerError, 

19 InvalidJTIError, 

20 InvalidSubjectError, 

21 MissingRequiredClaimError, 

22) 

23from .warnings import RemovedInPyjwt3Warning 

24 

25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): 

26 import sys 

27 

28 if sys.version_info >= (3, 10): 

29 from typing import TypeAlias 

30 else: 

31 # Python 3.9 and lower 

32 from typing_extensions import TypeAlias 

33 

34 from .algorithms import has_crypto 

35 from .api_jwk import PyJWK 

36 from .types import FullOptions, Options, SigOptions 

37 

38 if has_crypto: 

39 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

40 

41 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes] 

42 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes] 

43 else: 

44 AllowedPrivateKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore 

45 AllowedPublicKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore 

46 

47 

48class PyJWT: 

49 def __init__(self, options: Options | None = None) -> None: 

50 self.options: FullOptions 

51 self.options = self._get_default_options() 

52 if options is not None: 

53 self.options = self._merge_options(options) 

54 

55 self._jws = PyJWS(options=self._get_sig_options()) 

56 

57 @staticmethod 

58 def _get_default_options() -> FullOptions: 

59 return { 

60 "verify_signature": True, 

61 "verify_exp": True, 

62 "verify_nbf": True, 

63 "verify_iat": True, 

64 "verify_aud": True, 

65 "verify_iss": True, 

66 "verify_sub": True, 

67 "verify_jti": True, 

68 "require": [], 

69 "strict_aud": False, 

70 "enforce_minimum_key_length": False, 

71 } 

72 

73 def _get_sig_options(self) -> SigOptions: 

74 return { 

75 "verify_signature": self.options["verify_signature"], 

76 "enforce_minimum_key_length": self.options.get( 

77 "enforce_minimum_key_length", False 

78 ), 

79 } 

80 

81 def _merge_options(self, options: Options | None = None) -> FullOptions: 

82 if options is None: 

83 return self.options 

84 

85 # (defensive) set defaults for verify_x to False if verify_signature is False 

86 if not options.get("verify_signature", True): 

87 options["verify_exp"] = options.get("verify_exp", False) 

88 options["verify_nbf"] = options.get("verify_nbf", False) 

89 options["verify_iat"] = options.get("verify_iat", False) 

90 options["verify_aud"] = options.get("verify_aud", False) 

91 options["verify_iss"] = options.get("verify_iss", False) 

92 options["verify_sub"] = options.get("verify_sub", False) 

93 options["verify_jti"] = options.get("verify_jti", False) 

94 return {**self.options, **options} 

95 

96 def encode( 

97 self, 

98 payload: dict[str, Any], 

99 key: AllowedPrivateKeyTypes, 

100 algorithm: str | None = "HS256", 

101 headers: dict[str, Any] | None = None, 

102 json_encoder: type[json.JSONEncoder] | None = None, 

103 sort_headers: bool = True, 

104 ) -> str: 

105 """Encode the ``payload`` as JSON Web Token. 

106 

107 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` 

108 :type payload: dict[str, typing.Any] 

109 :param key: a key suitable for the chosen algorithm: 

110 

111 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string 

112 * for **symmetric algorithms**: plain string, sufficiently long for security 

113 

114 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` 

115 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. 

116 If ``headers`` includes ``alg``, it will be preferred to this parameter. 

117 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. 

118 :type algorithm: str or None 

119 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. 

120 :type headers: dict[str, typing.Any] or None 

121 :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` 

122 :type json_encoder: json.JSONEncoder or None 

123 

124 :rtype: str 

125 :returns: a JSON Web Token 

126 

127 :raises TypeError: if ``payload`` is not a ``dict`` 

128 """ 

129 # Check that we get a dict 

130 if not isinstance(payload, dict): 

131 raise TypeError( 

132 "Expecting a dict object, as JWT only supports " 

133 "JSON objects as payloads." 

134 ) 

135 

136 # Payload 

137 payload = payload.copy() 

138 for time_claim in ["exp", "iat", "nbf"]: 

139 # Convert datetime to a intDate value in known time-format claims 

140 if isinstance(payload.get(time_claim), datetime): 

141 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

142 

143 # Issue #1039, iss being set to non-string 

144 if "iss" in payload and not isinstance(payload["iss"], str): 

145 raise TypeError("Issuer (iss) must be a string.") 

146 

147 json_payload = self._encode_payload( 

148 payload, 

149 headers=headers, 

150 json_encoder=json_encoder, 

151 ) 

152 

153 return self._jws.encode( 

154 json_payload, 

155 key, 

156 algorithm, 

157 headers, 

158 json_encoder, 

159 sort_headers=sort_headers, 

160 ) 

161 

162 def _encode_payload( 

163 self, 

164 payload: dict[str, Any], 

165 headers: dict[str, Any] | None = None, 

166 json_encoder: type[json.JSONEncoder] | None = None, 

167 ) -> bytes: 

168 """ 

169 Encode a given payload to the bytes to be signed. 

170 

171 This method is intended to be overridden by subclasses that need to 

172 encode the payload in a different way, e.g. compress the payload. 

173 """ 

174 return json.dumps( 

175 payload, 

176 separators=(",", ":"), 

177 cls=json_encoder, 

178 ).encode("utf-8") 

179 

180 def decode_complete( 

181 self, 

182 jwt: str | bytes, 

183 key: AllowedPublicKeyTypes = "", 

184 algorithms: Sequence[str] | None = None, 

185 options: Options | None = None, 

186 # deprecated arg, remove in pyjwt3 

187 verify: bool | None = None, 

188 # could be used as passthrough to api_jws, consider removal in pyjwt3 

189 detached_payload: bytes | None = None, 

190 # passthrough arguments to _validate_claims 

191 # consider putting in options 

192 audience: str | Iterable[str] | None = None, 

193 issuer: str | Container[str] | None = None, 

194 subject: str | None = None, 

195 leeway: float | timedelta = 0, 

196 # kwargs 

197 **kwargs: Any, 

198 ) -> dict[str, Any]: 

199 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), 

200 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", 

201 and "signature" respectively. 

202 

203 :param jwt: the token to be decoded 

204 :type jwt: str or bytes 

205 :param key: the key suitable for the allowed algorithm 

206 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

207 

208 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

209 

210 .. warning:: 

211 

212 Do **not** compute the ``algorithms`` parameter based on 

213 the ``alg`` from the token itself, or on any other data 

214 that an attacker may be able to influence, as that might 

215 expose you to various vulnerabilities (see `RFC 8725 §2.1 

216 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

217 either hard-code a fixed value for ``algorithms``, or 

218 configure it in the same place you configure the 

219 ``key``. Make sure not to mix symmetric and asymmetric 

220 algorithms that interpret the ``key`` in different ways 

221 (e.g. HS\\* and RS\\*). 

222 :type algorithms: typing.Sequence[str] or None 

223 

224 :param jwt.types.Options options: extended decoding and validation options 

225 Refer to :py:class:`jwt.types.Options` for more information. 

226 

227 :param audience: optional, the value for ``verify_aud`` check 

228 :type audience: str or typing.Iterable[str] or None 

229 :param issuer: optional, the value for ``verify_iss`` check 

230 :type issuer: str or typing.Container[str] or None 

231 :param leeway: a time margin in seconds for the expiration check 

232 :type leeway: float or datetime.timedelta 

233 :rtype: dict[str, typing.Any] 

234 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS 

235 Payload on the key ``payload``, and the JWS Signature on the key ``signature``. 

236 """ 

237 if kwargs: 

238 warnings.warn( 

239 "passing additional kwargs to decode_complete() is deprecated " 

240 "and will be removed in pyjwt version 3. " 

241 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

242 RemovedInPyjwt3Warning, 

243 stacklevel=2, 

244 ) 

245 

246 if options is None: 

247 verify_signature = True 

248 else: 

249 verify_signature = options.get("verify_signature", True) 

250 

251 # If the user has set the legacy `verify` argument, and it doesn't match 

252 # what the relevant `options` entry for the argument is, inform the user 

253 # that they're likely making a mistake. 

254 if verify is not None and verify != verify_signature: 

255 warnings.warn( 

256 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

257 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

258 "This invocation has a mismatch between the kwarg and the option entry.", 

259 category=DeprecationWarning, 

260 stacklevel=2, 

261 ) 

262 

263 merged_options = self._merge_options(options) 

264 

265 sig_options: SigOptions = { 

266 "verify_signature": verify_signature, 

267 } 

268 decoded = self._jws.decode_complete( 

269 jwt, 

270 key=key, 

271 algorithms=algorithms, 

272 options=sig_options, 

273 detached_payload=detached_payload, 

274 ) 

275 

276 payload = self._decode_payload(decoded) 

277 

278 self._validate_claims( 

279 payload, 

280 merged_options, 

281 audience=audience, 

282 issuer=issuer, 

283 leeway=leeway, 

284 subject=subject, 

285 ) 

286 

287 decoded["payload"] = payload 

288 return decoded 

289 

290 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: 

291 """ 

292 Decode the payload from a JWS dictionary (payload, signature, header). 

293 

294 This method is intended to be overridden by subclasses that need to 

295 decode the payload in a different way, e.g. decompress compressed 

296 payloads. 

297 """ 

298 try: 

299 payload: dict[str, Any] = json.loads(decoded["payload"]) 

300 except ValueError as e: 

301 raise DecodeError(f"Invalid payload string: {e}") from e 

302 if not isinstance(payload, dict): 

303 raise DecodeError("Invalid payload string: must be a json object") 

304 return payload 

305 

306 def decode( 

307 self, 

308 jwt: str | bytes, 

309 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

310 algorithms: Sequence[str] | None = None, 

311 options: Options | None = None, 

312 # deprecated arg, remove in pyjwt3 

313 verify: bool | None = None, 

314 # could be used as passthrough to api_jws, consider removal in pyjwt3 

315 detached_payload: bytes | None = None, 

316 # passthrough arguments to _validate_claims 

317 # consider putting in options 

318 audience: str | Iterable[str] | None = None, 

319 subject: str | None = None, 

320 issuer: str | Container[str] | None = None, 

321 leeway: float | timedelta = 0, 

322 # kwargs 

323 **kwargs: Any, 

324 ) -> dict[str, Any]: 

325 """Verify the ``jwt`` token signature and return the token claims. 

326 

327 :param jwt: the token to be decoded 

328 :type jwt: str or bytes 

329 :param key: the key suitable for the allowed algorithm 

330 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

331 

332 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

333 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. 

334 

335 .. warning:: 

336 

337 Do **not** compute the ``algorithms`` parameter based on 

338 the ``alg`` from the token itself, or on any other data 

339 that an attacker may be able to influence, as that might 

340 expose you to various vulnerabilities (see `RFC 8725 §2.1 

341 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

342 either hard-code a fixed value for ``algorithms``, or 

343 configure it in the same place you configure the 

344 ``key``. Make sure not to mix symmetric and asymmetric 

345 algorithms that interpret the ``key`` in different ways 

346 (e.g. HS\\* and RS\\*). 

347 :type algorithms: typing.Sequence[str] or None 

348 

349 :param jwt.types.Options options: extended decoding and validation options 

350 Refer to :py:class:`jwt.types.Options` for more information. 

351 

352 :param audience: optional, the value for ``verify_aud`` check 

353 :type audience: str or typing.Iterable[str] or None 

354 :param subject: optional, the value for ``verify_sub`` check 

355 :type subject: str or None 

356 :param issuer: optional, the value for ``verify_iss`` check 

357 :type issuer: str or typing.Container[str] or None 

358 :param leeway: a time margin in seconds for the expiration check 

359 :type leeway: float or datetime.timedelta 

360 :rtype: dict[str, typing.Any] 

361 :returns: the JWT claims 

362 """ 

363 if kwargs: 

364 warnings.warn( 

365 "passing additional kwargs to decode() is deprecated " 

366 "and will be removed in pyjwt version 3. " 

367 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

368 RemovedInPyjwt3Warning, 

369 stacklevel=2, 

370 ) 

371 decoded = self.decode_complete( 

372 jwt, 

373 key, 

374 algorithms, 

375 options, 

376 verify=verify, 

377 detached_payload=detached_payload, 

378 audience=audience, 

379 subject=subject, 

380 issuer=issuer, 

381 leeway=leeway, 

382 ) 

383 return cast(dict[str, Any], decoded["payload"]) 

384 

385 def _validate_claims( 

386 self, 

387 payload: dict[str, Any], 

388 options: FullOptions, 

389 audience: Iterable[str] | str | None = None, 

390 issuer: Container[str] | str | None = None, 

391 subject: str | None = None, 

392 leeway: float | timedelta = 0, 

393 ) -> None: 

394 if isinstance(leeway, timedelta): 

395 leeway = leeway.total_seconds() 

396 

397 if audience is not None and not isinstance(audience, (str, Iterable)): 

398 raise TypeError("audience must be a string, iterable or None") 

399 

400 self._validate_required_claims(payload, options["require"]) 

401 

402 now = datetime.now(tz=timezone.utc).timestamp() 

403 

404 if "iat" in payload and options["verify_iat"]: 

405 self._validate_iat(payload, now, leeway) 

406 

407 if "nbf" in payload and options["verify_nbf"]: 

408 self._validate_nbf(payload, now, leeway) 

409 

410 if "exp" in payload and options["verify_exp"]: 

411 self._validate_exp(payload, now, leeway) 

412 

413 if options["verify_iss"]: 

414 self._validate_iss(payload, issuer) 

415 

416 if options["verify_aud"]: 

417 self._validate_aud( 

418 payload, audience, strict=options.get("strict_aud", False) 

419 ) 

420 

421 if options["verify_sub"]: 

422 self._validate_sub(payload, subject) 

423 

424 if options["verify_jti"]: 

425 self._validate_jti(payload) 

426 

427 def _validate_required_claims( 

428 self, 

429 payload: dict[str, Any], 

430 claims: Iterable[str], 

431 ) -> None: 

432 for claim in claims: 

433 if payload.get(claim) is None: 

434 raise MissingRequiredClaimError(claim) 

435 

436 def _validate_sub( 

437 self, payload: dict[str, Any], subject: str | None = None 

438 ) -> None: 

439 """ 

440 Checks whether "sub" if in the payload is valid or not. 

441 This is an Optional claim 

442 

443 :param payload(dict): The payload which needs to be validated 

444 :param subject(str): The subject of the token 

445 """ 

446 

447 if "sub" not in payload: 

448 return 

449 

450 if not isinstance(payload["sub"], str): 

451 raise InvalidSubjectError("Subject must be a string") 

452 

453 if subject is not None: 

454 if payload.get("sub") != subject: 

455 raise InvalidSubjectError("Invalid subject") 

456 

457 def _validate_jti(self, payload: dict[str, Any]) -> None: 

458 """ 

459 Checks whether "jti" if in the payload is valid or not 

460 This is an Optional claim 

461 

462 :param payload(dict): The payload which needs to be validated 

463 """ 

464 

465 if "jti" not in payload: 

466 return 

467 

468 if not isinstance(payload.get("jti"), str): 

469 raise InvalidJTIError("JWT ID must be a string") 

470 

471 def _validate_iat( 

472 self, 

473 payload: dict[str, Any], 

474 now: float, 

475 leeway: float, 

476 ) -> None: 

477 try: 

478 iat = int(payload["iat"]) 

479 except ValueError: 

480 raise InvalidIssuedAtError( 

481 "Issued At claim (iat) must be an integer." 

482 ) from None 

483 if iat > (now + leeway): 

484 raise ImmatureSignatureError("The token is not yet valid (iat)") 

485 

486 def _validate_nbf( 

487 self, 

488 payload: dict[str, Any], 

489 now: float, 

490 leeway: float, 

491 ) -> None: 

492 try: 

493 nbf = int(payload["nbf"]) 

494 except ValueError: 

495 raise DecodeError("Not Before claim (nbf) must be an integer.") from None 

496 

497 if nbf > (now + leeway): 

498 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

499 

500 def _validate_exp( 

501 self, 

502 payload: dict[str, Any], 

503 now: float, 

504 leeway: float, 

505 ) -> None: 

506 try: 

507 exp = int(payload["exp"]) 

508 except ValueError: 

509 raise DecodeError( 

510 "Expiration Time claim (exp) must be an integer." 

511 ) from None 

512 

513 if exp <= (now - leeway): 

514 raise ExpiredSignatureError("Signature has expired") 

515 

516 def _validate_aud( 

517 self, 

518 payload: dict[str, Any], 

519 audience: str | Iterable[str] | None, 

520 *, 

521 strict: bool = False, 

522 ) -> None: 

523 if audience is None: 

524 if "aud" not in payload or not payload["aud"]: 

525 return 

526 # Application did not specify an audience, but 

527 # the token has the 'aud' claim 

528 raise InvalidAudienceError("Invalid audience") 

529 

530 if "aud" not in payload or not payload["aud"]: 

531 # Application specified an audience, but it could not be 

532 # verified since the token does not contain a claim. 

533 raise MissingRequiredClaimError("aud") 

534 

535 audience_claims = payload["aud"] 

536 

537 # In strict mode, we forbid list matching: the supplied audience 

538 # must be a string, and it must exactly match the audience claim. 

539 if strict: 

540 # Only a single audience is allowed in strict mode. 

541 if not isinstance(audience, str): 

542 raise InvalidAudienceError("Invalid audience (strict)") 

543 

544 # Only a single audience claim is allowed in strict mode. 

545 if not isinstance(audience_claims, str): 

546 raise InvalidAudienceError("Invalid claim format in token (strict)") 

547 

548 if audience != audience_claims: 

549 raise InvalidAudienceError("Audience doesn't match (strict)") 

550 

551 return 

552 

553 if isinstance(audience_claims, str): 

554 audience_claims = [audience_claims] 

555 if not isinstance(audience_claims, list): 

556 raise InvalidAudienceError("Invalid claim format in token") 

557 if any(not isinstance(c, str) for c in audience_claims): 

558 raise InvalidAudienceError("Invalid claim format in token") 

559 

560 if isinstance(audience, str): 

561 audience = [audience] 

562 

563 if all(aud not in audience_claims for aud in audience): 

564 raise InvalidAudienceError("Audience doesn't match") 

565 

566 def _validate_iss( 

567 self, payload: dict[str, Any], issuer: Container[str] | str | None 

568 ) -> None: 

569 if issuer is None: 

570 return 

571 

572 if "iss" not in payload: 

573 raise MissingRequiredClaimError("iss") 

574 

575 iss = payload["iss"] 

576 if not isinstance(iss, str): 

577 raise InvalidIssuerError("Payload Issuer (iss) must be a string") 

578 

579 if isinstance(issuer, str): 

580 if iss != issuer: 

581 raise InvalidIssuerError("Invalid issuer") 

582 else: 

583 try: 

584 if iss not in issuer: 

585 raise InvalidIssuerError("Invalid issuer") 

586 except TypeError: 

587 raise InvalidIssuerError( 

588 'Issuer param must be "str" or "Container[str]"' 

589 ) from None 

590 

591 

592_jwt_global_obj = PyJWT() 

593_jwt_global_obj._jws = _jws_global_obj 

594encode = _jwt_global_obj.encode 

595decode_complete = _jwt_global_obj.decode_complete 

596decode = _jwt_global_obj.decode