Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

189 statements  

1from __future__ import annotations 

2 

3import json 

4import os 

5import warnings 

6from calendar import timegm 

7from collections.abc import Iterable, Sequence 

8from datetime import datetime, timedelta, timezone 

9from typing import TYPE_CHECKING, Any, Container 

10 

11from . import api_jws 

12from .exceptions import ( 

13 DecodeError, 

14 ExpiredSignatureError, 

15 ImmatureSignatureError, 

16 InvalidAudienceError, 

17 InvalidIssuedAtError, 

18 InvalidIssuerError, 

19 InvalidJTIError, 

20 InvalidSubjectError, 

21 MissingRequiredClaimError, 

22) 

23from .warnings import RemovedInPyjwt3Warning 

24 

25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): 

26 from typing import TypeAlias 

27 

28 from .algorithms import has_crypto 

29 from .api_jwk import PyJWK 

30 from .types import FullOptions, Options, SigOptions 

31 

32 if has_crypto: 

33 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

34 

35 AllowedPrivateKeyTypes: TypeAlias = AllowedPrivateKeys | PyJWK | str | bytes # type: ignore 

36 AllowedPublicKeyTypes: TypeAlias = AllowedPublicKeys | PyJWK | str | bytes # type: ignore 

37 else: 

38 AllowedPrivateKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore 

39 AllowedPublicKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore 

40 

41 

42class PyJWT: 

43 def __init__(self, options: Options | None = None) -> None: 

44 self.options: FullOptions 

45 self.options = self._get_default_options() 

46 if options is not None: 

47 self.options = self._merge_options(options) 

48 

49 @staticmethod 

50 def _get_default_options() -> FullOptions: 

51 return { 

52 "verify_signature": True, 

53 "verify_exp": True, 

54 "verify_nbf": True, 

55 "verify_iat": True, 

56 "verify_aud": True, 

57 "verify_iss": True, 

58 "verify_sub": True, 

59 "verify_jti": True, 

60 "require": [], 

61 "strict_aud": False, 

62 } 

63 

64 def _merge_options(self, options: Options | None = None) -> FullOptions: 

65 if options is None: 

66 return self.options 

67 

68 # (defensive) set defaults for verify_x to False if verify_signature is False 

69 if not options.get("verify_signature", True): 

70 options["verify_exp"] = options.get("verify_exp", False) 

71 options["verify_nbf"] = options.get("verify_nbf", False) 

72 options["verify_iat"] = options.get("verify_iat", False) 

73 options["verify_aud"] = options.get("verify_aud", False) 

74 options["verify_iss"] = options.get("verify_iss", False) 

75 options["verify_sub"] = options.get("verify_sub", False) 

76 options["verify_jti"] = options.get("verify_jti", False) 

77 return {**self.options, **options} 

78 

79 def encode( 

80 self, 

81 payload: dict[str, Any], 

82 key: AllowedPrivateKeyTypes, 

83 algorithm: str | None = "HS256", 

84 headers: dict[str, Any] | None = None, 

85 json_encoder: type[json.JSONEncoder] | None = None, 

86 sort_headers: bool = True, 

87 ) -> str: 

88 """Encode the ``payload`` as JSON Web Token. 

89 

90 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` 

91 :type payload: dict[str, typing.Any] 

92 :param key: a key suitable for the chosen algorithm: 

93 

94 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string 

95 * for **symmetric algorithms**: plain string, sufficiently long for security 

96 

97 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` 

98 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. 

99 If ``headers`` includes ``alg``, it will be preferred to this parameter. 

100 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. 

101 :type algorithm: str or None 

102 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. 

103 :type headers: dict[str, typing.Any] or None 

104 :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` 

105 :type json_encoder: json.JSONEncoder or None 

106 

107 :rtype: str 

108 :returns: a JSON Web Token 

109 

110 :raises TypeError: if ``payload`` is not a ``dict`` 

111 """ 

112 # Check that we get a dict 

113 if not isinstance(payload, dict): 

114 raise TypeError( 

115 "Expecting a dict object, as JWT only supports " 

116 "JSON objects as payloads." 

117 ) 

118 

119 # Payload 

120 payload = payload.copy() 

121 for time_claim in ["exp", "iat", "nbf"]: 

122 # Convert datetime to a intDate value in known time-format claims 

123 if isinstance(payload.get(time_claim), datetime): 

124 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

125 

126 # Issue #1039, iss being set to non-string 

127 if "iss" in payload and not isinstance(payload["iss"], str): 

128 raise TypeError("Issuer (iss) must be a string.") 

129 

130 json_payload = self._encode_payload( 

131 payload, 

132 headers=headers, 

133 json_encoder=json_encoder, 

134 ) 

135 

136 return api_jws.encode( 

137 json_payload, 

138 key, 

139 algorithm, 

140 headers, 

141 json_encoder, 

142 sort_headers=sort_headers, 

143 ) 

144 

145 def _encode_payload( 

146 self, 

147 payload: dict[str, Any], 

148 headers: dict[str, Any] | None = None, 

149 json_encoder: type[json.JSONEncoder] | None = None, 

150 ) -> bytes: 

151 """ 

152 Encode a given payload to the bytes to be signed. 

153 

154 This method is intended to be overridden by subclasses that need to 

155 encode the payload in a different way, e.g. compress the payload. 

156 """ 

157 return json.dumps( 

158 payload, 

159 separators=(",", ":"), 

160 cls=json_encoder, 

161 ).encode("utf-8") 

162 

163 def decode_complete( 

164 self, 

165 jwt: str | bytes, 

166 key: AllowedPublicKeyTypes = "", 

167 algorithms: Sequence[str] | None = None, 

168 options: Options | None = None, 

169 # deprecated arg, remove in pyjwt3 

170 verify: bool | None = None, 

171 # could be used as passthrough to api_jws, consider removal in pyjwt3 

172 detached_payload: bytes | None = None, 

173 # passthrough arguments to _validate_claims 

174 # consider putting in options 

175 audience: str | Iterable[str] | None = None, 

176 issuer: str | Container[str] | None = None, 

177 subject: str | None = None, 

178 leeway: float | timedelta = 0, 

179 # kwargs 

180 **kwargs: Any, 

181 ) -> dict[str, Any]: 

182 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), 

183 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", 

184 and "signature" respectively. 

185 

186 :param jwt: the token to be decoded 

187 :type jwt: str or bytes 

188 :param key: the key suitable for the allowed algorithm 

189 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

190 

191 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

192 

193 .. warning:: 

194 

195 Do **not** compute the ``algorithms`` parameter based on 

196 the ``alg`` from the token itself, or on any other data 

197 that an attacker may be able to influence, as that might 

198 expose you to various vulnerabilities (see `RFC 8725 §2.1 

199 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

200 either hard-code a fixed value for ``algorithms``, or 

201 configure it in the same place you configure the 

202 ``key``. Make sure not to mix symmetric and asymmetric 

203 algorithms that interpret the ``key`` in different ways 

204 (e.g. HS\\* and RS\\*). 

205 :type algorithms: typing.Sequence[str] or None 

206 

207 :param jwt.types.Options options: extended decoding and validation options 

208 Refer to :py:class:`jwt.types.Options` for more information. 

209 

210 :param audience: optional, the value for ``verify_aud`` check 

211 :type audience: str or typing.Iterable[str] or None 

212 :param issuer: optional, the value for ``verify_iss`` check 

213 :type issuer: str or typing.Container[str] or None 

214 :param leeway: a time margin in seconds for the expiration check 

215 :type leeway: float or datetime.timedelta 

216 :rtype: dict[str, typing.Any] 

217 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS 

218 Payload on the key ``payload``, and the JWS Signature on the key ``signature``. 

219 """ 

220 if kwargs: 

221 warnings.warn( 

222 "passing additional kwargs to decode_complete() is deprecated " 

223 "and will be removed in pyjwt version 3. " 

224 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

225 RemovedInPyjwt3Warning, 

226 stacklevel=2, 

227 ) 

228 

229 if options is None: 

230 verify_signature = True 

231 else: 

232 verify_signature = options.get("verify_signature", True) 

233 

234 # If the user has set the legacy `verify` argument, and it doesn't match 

235 # what the relevant `options` entry for the argument is, inform the user 

236 # that they're likely making a mistake. 

237 if verify is not None and verify != verify_signature: 

238 warnings.warn( 

239 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

240 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

241 "This invocation has a mismatch between the kwarg and the option entry.", 

242 category=DeprecationWarning, 

243 stacklevel=2, 

244 ) 

245 

246 sig_options: SigOptions = {"verify_signature": verify_signature} 

247 decoded = api_jws.decode_complete( 

248 jwt, 

249 key=key, 

250 algorithms=algorithms, 

251 options=sig_options, 

252 detached_payload=detached_payload, 

253 ) 

254 

255 payload = self._decode_payload(decoded) 

256 

257 merged_options = self._merge_options(options) 

258 self._validate_claims( 

259 payload, 

260 merged_options, 

261 audience=audience, 

262 issuer=issuer, 

263 leeway=leeway, 

264 subject=subject, 

265 ) 

266 

267 decoded["payload"] = payload 

268 return decoded 

269 

270 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: 

271 """ 

272 Decode the payload from a JWS dictionary (payload, signature, header). 

273 

274 This method is intended to be overridden by subclasses that need to 

275 decode the payload in a different way, e.g. decompress compressed 

276 payloads. 

277 """ 

278 try: 

279 payload: dict[str, Any] = json.loads(decoded["payload"]) 

280 except ValueError as e: 

281 raise DecodeError(f"Invalid payload string: {e}") from e 

282 if not isinstance(payload, dict): 

283 raise DecodeError("Invalid payload string: must be a json object") 

284 return payload 

285 

286 def decode( 

287 self, 

288 jwt: str | bytes, 

289 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

290 algorithms: Sequence[str] | None = None, 

291 options: Options | None = None, 

292 # deprecated arg, remove in pyjwt3 

293 verify: bool | None = None, 

294 # could be used as passthrough to api_jws, consider removal in pyjwt3 

295 detached_payload: bytes | None = None, 

296 # passthrough arguments to _validate_claims 

297 # consider putting in options 

298 audience: str | Iterable[str] | None = None, 

299 subject: str | None = None, 

300 issuer: str | Container[str] | None = None, 

301 leeway: float | timedelta = 0, 

302 # kwargs 

303 **kwargs: Any, 

304 ) -> dict[str, Any]: 

305 """Verify the ``jwt`` token signature and return the token claims. 

306 

307 :param jwt: the token to be decoded 

308 :type jwt: str or bytes 

309 :param key: the key suitable for the allowed algorithm 

310 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

311 

312 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

313 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. 

314 

315 .. warning:: 

316 

317 Do **not** compute the ``algorithms`` parameter based on 

318 the ``alg`` from the token itself, or on any other data 

319 that an attacker may be able to influence, as that might 

320 expose you to various vulnerabilities (see `RFC 8725 §2.1 

321 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

322 either hard-code a fixed value for ``algorithms``, or 

323 configure it in the same place you configure the 

324 ``key``. Make sure not to mix symmetric and asymmetric 

325 algorithms that interpret the ``key`` in different ways 

326 (e.g. HS\* and RS\*). 

327 :type algorithms: typing.Sequence[str] or None 

328 

329 :param jwt.types.Options options: extended decoding and validation options 

330 Refer to :py:class:`jwt.types.Options` for more information. 

331 

332 :param audience: optional, the value for ``verify_aud`` check 

333 :type audience: str or typing.Iterable[str] or None 

334 :param subject: optional, the value for ``verify_sub`` check 

335 :type subject: str or None 

336 :param issuer: optional, the value for ``verify_iss`` check 

337 :type issuer: str or typing.Container[str] or None 

338 :param leeway: a time margin in seconds for the expiration check 

339 :type leeway: float or datetime.timedelta 

340 :rtype: dict[str, typing.Any] 

341 :returns: the JWT claims 

342 """ 

343 if kwargs: 

344 warnings.warn( 

345 "passing additional kwargs to decode() is deprecated " 

346 "and will be removed in pyjwt version 3. " 

347 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

348 RemovedInPyjwt3Warning, 

349 stacklevel=2, 

350 ) 

351 decoded = self.decode_complete( 

352 jwt, 

353 key, 

354 algorithms, 

355 options, 

356 verify=verify, 

357 detached_payload=detached_payload, 

358 audience=audience, 

359 subject=subject, 

360 issuer=issuer, 

361 leeway=leeway, 

362 ) 

363 return decoded["payload"] 

364 

365 def _validate_claims( 

366 self, 

367 payload: dict[str, Any], 

368 options: FullOptions, 

369 audience: Iterable[str] | str | None = None, 

370 issuer: Container[str] | str | None = None, 

371 subject: str | None = None, 

372 leeway: float | timedelta = 0, 

373 ) -> None: 

374 if isinstance(leeway, timedelta): 

375 leeway = leeway.total_seconds() 

376 

377 if audience is not None and not isinstance(audience, (str, Iterable)): 

378 raise TypeError("audience must be a string, iterable or None") 

379 

380 self._validate_required_claims(payload, options["require"]) 

381 

382 now = datetime.now(tz=timezone.utc).timestamp() 

383 

384 if "iat" in payload and options["verify_iat"]: 

385 self._validate_iat(payload, now, leeway) 

386 

387 if "nbf" in payload and options["verify_nbf"]: 

388 self._validate_nbf(payload, now, leeway) 

389 

390 if "exp" in payload and options["verify_exp"]: 

391 self._validate_exp(payload, now, leeway) 

392 

393 if options["verify_iss"]: 

394 self._validate_iss(payload, issuer) 

395 

396 if options["verify_aud"]: 

397 self._validate_aud( 

398 payload, audience, strict=options.get("strict_aud", False) 

399 ) 

400 

401 if options["verify_sub"]: 

402 self._validate_sub(payload, subject) 

403 

404 if options["verify_jti"]: 

405 self._validate_jti(payload) 

406 

407 def _validate_required_claims( 

408 self, 

409 payload: dict[str, Any], 

410 claims: Iterable[str], 

411 ) -> None: 

412 for claim in claims: 

413 if payload.get(claim) is None: 

414 raise MissingRequiredClaimError(claim) 

415 

416 def _validate_sub( 

417 self, payload: dict[str, Any], subject: str | None = None 

418 ) -> None: 

419 """ 

420 Checks whether "sub" if in the payload is valid or not. 

421 This is an Optional claim 

422 

423 :param payload(dict): The payload which needs to be validated 

424 :param subject(str): The subject of the token 

425 """ 

426 

427 if "sub" not in payload: 

428 return 

429 

430 if not isinstance(payload["sub"], str): 

431 raise InvalidSubjectError("Subject must be a string") 

432 

433 if subject is not None: 

434 if payload.get("sub") != subject: 

435 raise InvalidSubjectError("Invalid subject") 

436 

437 def _validate_jti(self, payload: dict[str, Any]) -> None: 

438 """ 

439 Checks whether "jti" if in the payload is valid or not 

440 This is an Optional claim 

441 

442 :param payload(dict): The payload which needs to be validated 

443 """ 

444 

445 if "jti" not in payload: 

446 return 

447 

448 if not isinstance(payload.get("jti"), str): 

449 raise InvalidJTIError("JWT ID must be a string") 

450 

451 def _validate_iat( 

452 self, 

453 payload: dict[str, Any], 

454 now: float, 

455 leeway: float, 

456 ) -> None: 

457 try: 

458 iat = int(payload["iat"]) 

459 except ValueError: 

460 raise InvalidIssuedAtError( 

461 "Issued At claim (iat) must be an integer." 

462 ) from None 

463 if iat > (now + leeway): 

464 raise ImmatureSignatureError("The token is not yet valid (iat)") 

465 

466 def _validate_nbf( 

467 self, 

468 payload: dict[str, Any], 

469 now: float, 

470 leeway: float, 

471 ) -> None: 

472 try: 

473 nbf = int(payload["nbf"]) 

474 except ValueError: 

475 raise DecodeError("Not Before claim (nbf) must be an integer.") from None 

476 

477 if nbf > (now + leeway): 

478 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

479 

480 def _validate_exp( 

481 self, 

482 payload: dict[str, Any], 

483 now: float, 

484 leeway: float, 

485 ) -> None: 

486 try: 

487 exp = int(payload["exp"]) 

488 except ValueError: 

489 raise DecodeError( 

490 "Expiration Time claim (exp) must be an integer." 

491 ) from None 

492 

493 if exp <= (now - leeway): 

494 raise ExpiredSignatureError("Signature has expired") 

495 

496 def _validate_aud( 

497 self, 

498 payload: dict[str, Any], 

499 audience: str | Iterable[str] | None, 

500 *, 

501 strict: bool = False, 

502 ) -> None: 

503 if audience is None: 

504 if "aud" not in payload or not payload["aud"]: 

505 return 

506 # Application did not specify an audience, but 

507 # the token has the 'aud' claim 

508 raise InvalidAudienceError("Invalid audience") 

509 

510 if "aud" not in payload or not payload["aud"]: 

511 # Application specified an audience, but it could not be 

512 # verified since the token does not contain a claim. 

513 raise MissingRequiredClaimError("aud") 

514 

515 audience_claims = payload["aud"] 

516 

517 # In strict mode, we forbid list matching: the supplied audience 

518 # must be a string, and it must exactly match the audience claim. 

519 if strict: 

520 # Only a single audience is allowed in strict mode. 

521 if not isinstance(audience, str): 

522 raise InvalidAudienceError("Invalid audience (strict)") 

523 

524 # Only a single audience claim is allowed in strict mode. 

525 if not isinstance(audience_claims, str): 

526 raise InvalidAudienceError("Invalid claim format in token (strict)") 

527 

528 if audience != audience_claims: 

529 raise InvalidAudienceError("Audience doesn't match (strict)") 

530 

531 return 

532 

533 if isinstance(audience_claims, str): 

534 audience_claims = [audience_claims] 

535 if not isinstance(audience_claims, list): 

536 raise InvalidAudienceError("Invalid claim format in token") 

537 if any(not isinstance(c, str) for c in audience_claims): 

538 raise InvalidAudienceError("Invalid claim format in token") 

539 

540 if isinstance(audience, str): 

541 audience = [audience] 

542 

543 if all(aud not in audience_claims for aud in audience): 

544 raise InvalidAudienceError("Audience doesn't match") 

545 

546 def _validate_iss( 

547 self, payload: dict[str, Any], issuer: Container[str] | str | None 

548 ) -> None: 

549 if issuer is None: 

550 return 

551 

552 if "iss" not in payload: 

553 raise MissingRequiredClaimError("iss") 

554 

555 iss = payload["iss"] 

556 if not isinstance(iss, str): 

557 raise InvalidIssuerError("Payload Issuer (iss) must be a string") 

558 

559 if isinstance(issuer, str): 

560 if iss != issuer: 

561 raise InvalidIssuerError("Invalid issuer") 

562 else: 

563 try: 

564 if iss not in issuer: 

565 raise InvalidIssuerError("Invalid issuer") 

566 except TypeError: 

567 raise InvalidIssuerError( 

568 'Issuer param must be "str" or "Container[str]"' 

569 ) from None 

570 

571 

572_jwt_global_obj = PyJWT() 

573encode = _jwt_global_obj.encode 

574decode_complete = _jwt_global_obj.decode_complete 

575decode = _jwt_global_obj.decode