Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jwt/api_jwt.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1from __future__ import annotations 

2 

3import json 

4import os 

5import warnings 

6from calendar import timegm 

7from collections.abc import Container, Iterable, Sequence 

8from datetime import datetime, timedelta, timezone 

9from typing import TYPE_CHECKING, Any, Union, cast 

10 

11from . import api_jws 

12from .exceptions import ( 

13 DecodeError, 

14 ExpiredSignatureError, 

15 ImmatureSignatureError, 

16 InvalidAudienceError, 

17 InvalidIssuedAtError, 

18 InvalidIssuerError, 

19 InvalidJTIError, 

20 InvalidSubjectError, 

21 MissingRequiredClaimError, 

22) 

23from .warnings import RemovedInPyjwt3Warning 

24 

25if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): 

26 import sys 

27 

28 if sys.version_info >= (3, 10): 

29 from typing import TypeAlias 

30 else: 

31 # Python 3.9 and lower 

32 from typing_extensions import TypeAlias 

33 

34 from .algorithms import has_crypto 

35 from .api_jwk import PyJWK 

36 from .types import FullOptions, Options, SigOptions 

37 

38 if has_crypto: 

39 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

40 

41 AllowedPrivateKeyTypes: TypeAlias = Union[AllowedPrivateKeys, PyJWK, str, bytes] 

42 AllowedPublicKeyTypes: TypeAlias = Union[AllowedPublicKeys, PyJWK, str, bytes] 

43 else: 

44 AllowedPrivateKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore 

45 AllowedPublicKeyTypes: TypeAlias = Union[PyJWK, str, bytes] # type: ignore 

46 

47 

48class PyJWT: 

49 def __init__(self, options: Options | None = None) -> None: 

50 self.options: FullOptions 

51 self.options = self._get_default_options() 

52 if options is not None: 

53 self.options = self._merge_options(options) 

54 

55 @staticmethod 

56 def _get_default_options() -> FullOptions: 

57 return { 

58 "verify_signature": True, 

59 "verify_exp": True, 

60 "verify_nbf": True, 

61 "verify_iat": True, 

62 "verify_aud": True, 

63 "verify_iss": True, 

64 "verify_sub": True, 

65 "verify_jti": True, 

66 "require": [], 

67 "strict_aud": False, 

68 } 

69 

70 def _merge_options(self, options: Options | None = None) -> FullOptions: 

71 if options is None: 

72 return self.options 

73 

74 # (defensive) set defaults for verify_x to False if verify_signature is False 

75 if not options.get("verify_signature", True): 

76 options["verify_exp"] = options.get("verify_exp", False) 

77 options["verify_nbf"] = options.get("verify_nbf", False) 

78 options["verify_iat"] = options.get("verify_iat", False) 

79 options["verify_aud"] = options.get("verify_aud", False) 

80 options["verify_iss"] = options.get("verify_iss", False) 

81 options["verify_sub"] = options.get("verify_sub", False) 

82 options["verify_jti"] = options.get("verify_jti", False) 

83 return {**self.options, **options} 

84 

85 def encode( 

86 self, 

87 payload: dict[str, Any], 

88 key: AllowedPrivateKeyTypes, 

89 algorithm: str | None = "HS256", 

90 headers: dict[str, Any] | None = None, 

91 json_encoder: type[json.JSONEncoder] | None = None, 

92 sort_headers: bool = True, 

93 ) -> str: 

94 """Encode the ``payload`` as JSON Web Token. 

95 

96 :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` 

97 :type payload: dict[str, typing.Any] 

98 :param key: a key suitable for the chosen algorithm: 

99 

100 * for **asymmetric algorithms**: PEM-formatted private key, a multiline string 

101 * for **symmetric algorithms**: plain string, sufficiently long for security 

102 

103 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` 

104 :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. 

105 If ``headers`` includes ``alg``, it will be preferred to this parameter. 

106 If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. 

107 :type algorithm: str or None 

108 :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. 

109 :type headers: dict[str, typing.Any] or None 

110 :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` 

111 :type json_encoder: json.JSONEncoder or None 

112 

113 :rtype: str 

114 :returns: a JSON Web Token 

115 

116 :raises TypeError: if ``payload`` is not a ``dict`` 

117 """ 

118 # Check that we get a dict 

119 if not isinstance(payload, dict): 

120 raise TypeError( 

121 "Expecting a dict object, as JWT only supports " 

122 "JSON objects as payloads." 

123 ) 

124 

125 # Payload 

126 payload = payload.copy() 

127 for time_claim in ["exp", "iat", "nbf"]: 

128 # Convert datetime to a intDate value in known time-format claims 

129 if isinstance(payload.get(time_claim), datetime): 

130 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

131 

132 # Issue #1039, iss being set to non-string 

133 if "iss" in payload and not isinstance(payload["iss"], str): 

134 raise TypeError("Issuer (iss) must be a string.") 

135 

136 json_payload = self._encode_payload( 

137 payload, 

138 headers=headers, 

139 json_encoder=json_encoder, 

140 ) 

141 

142 return api_jws.encode( 

143 json_payload, 

144 key, 

145 algorithm, 

146 headers, 

147 json_encoder, 

148 sort_headers=sort_headers, 

149 ) 

150 

151 def _encode_payload( 

152 self, 

153 payload: dict[str, Any], 

154 headers: dict[str, Any] | None = None, 

155 json_encoder: type[json.JSONEncoder] | None = None, 

156 ) -> bytes: 

157 """ 

158 Encode a given payload to the bytes to be signed. 

159 

160 This method is intended to be overridden by subclasses that need to 

161 encode the payload in a different way, e.g. compress the payload. 

162 """ 

163 return json.dumps( 

164 payload, 

165 separators=(",", ":"), 

166 cls=json_encoder, 

167 ).encode("utf-8") 

168 

169 def decode_complete( 

170 self, 

171 jwt: str | bytes, 

172 key: AllowedPublicKeyTypes = "", 

173 algorithms: Sequence[str] | None = None, 

174 options: Options | None = None, 

175 # deprecated arg, remove in pyjwt3 

176 verify: bool | None = None, 

177 # could be used as passthrough to api_jws, consider removal in pyjwt3 

178 detached_payload: bytes | None = None, 

179 # passthrough arguments to _validate_claims 

180 # consider putting in options 

181 audience: str | Iterable[str] | None = None, 

182 issuer: str | Container[str] | None = None, 

183 subject: str | None = None, 

184 leeway: float | timedelta = 0, 

185 # kwargs 

186 **kwargs: Any, 

187 ) -> dict[str, Any]: 

188 """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), 

189 the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", 

190 and "signature" respectively. 

191 

192 :param jwt: the token to be decoded 

193 :type jwt: str or bytes 

194 :param key: the key suitable for the allowed algorithm 

195 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

196 

197 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

198 

199 .. warning:: 

200 

201 Do **not** compute the ``algorithms`` parameter based on 

202 the ``alg`` from the token itself, or on any other data 

203 that an attacker may be able to influence, as that might 

204 expose you to various vulnerabilities (see `RFC 8725 §2.1 

205 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

206 either hard-code a fixed value for ``algorithms``, or 

207 configure it in the same place you configure the 

208 ``key``. Make sure not to mix symmetric and asymmetric 

209 algorithms that interpret the ``key`` in different ways 

210 (e.g. HS\\* and RS\\*). 

211 :type algorithms: typing.Sequence[str] or None 

212 

213 :param jwt.types.Options options: extended decoding and validation options 

214 Refer to :py:class:`jwt.types.Options` for more information. 

215 

216 :param audience: optional, the value for ``verify_aud`` check 

217 :type audience: str or typing.Iterable[str] or None 

218 :param issuer: optional, the value for ``verify_iss`` check 

219 :type issuer: str or typing.Container[str] or None 

220 :param leeway: a time margin in seconds for the expiration check 

221 :type leeway: float or datetime.timedelta 

222 :rtype: dict[str, typing.Any] 

223 :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS 

224 Payload on the key ``payload``, and the JWS Signature on the key ``signature``. 

225 """ 

226 if kwargs: 

227 warnings.warn( 

228 "passing additional kwargs to decode_complete() is deprecated " 

229 "and will be removed in pyjwt version 3. " 

230 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

231 RemovedInPyjwt3Warning, 

232 stacklevel=2, 

233 ) 

234 

235 if options is None: 

236 verify_signature = True 

237 else: 

238 verify_signature = options.get("verify_signature", True) 

239 

240 # If the user has set the legacy `verify` argument, and it doesn't match 

241 # what the relevant `options` entry for the argument is, inform the user 

242 # that they're likely making a mistake. 

243 if verify is not None and verify != verify_signature: 

244 warnings.warn( 

245 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

246 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

247 "This invocation has a mismatch between the kwarg and the option entry.", 

248 category=DeprecationWarning, 

249 stacklevel=2, 

250 ) 

251 

252 sig_options: SigOptions = {"verify_signature": verify_signature} 

253 decoded = api_jws.decode_complete( 

254 jwt, 

255 key=key, 

256 algorithms=algorithms, 

257 options=sig_options, 

258 detached_payload=detached_payload, 

259 ) 

260 

261 payload = self._decode_payload(decoded) 

262 

263 merged_options = self._merge_options(options) 

264 self._validate_claims( 

265 payload, 

266 merged_options, 

267 audience=audience, 

268 issuer=issuer, 

269 leeway=leeway, 

270 subject=subject, 

271 ) 

272 

273 decoded["payload"] = payload 

274 return decoded 

275 

276 def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: 

277 """ 

278 Decode the payload from a JWS dictionary (payload, signature, header). 

279 

280 This method is intended to be overridden by subclasses that need to 

281 decode the payload in a different way, e.g. decompress compressed 

282 payloads. 

283 """ 

284 try: 

285 payload: dict[str, Any] = json.loads(decoded["payload"]) 

286 except ValueError as e: 

287 raise DecodeError(f"Invalid payload string: {e}") from e 

288 if not isinstance(payload, dict): 

289 raise DecodeError("Invalid payload string: must be a json object") 

290 return payload 

291 

292 def decode( 

293 self, 

294 jwt: str | bytes, 

295 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

296 algorithms: Sequence[str] | None = None, 

297 options: Options | None = None, 

298 # deprecated arg, remove in pyjwt3 

299 verify: bool | None = None, 

300 # could be used as passthrough to api_jws, consider removal in pyjwt3 

301 detached_payload: bytes | None = None, 

302 # passthrough arguments to _validate_claims 

303 # consider putting in options 

304 audience: str | Iterable[str] | None = None, 

305 subject: str | None = None, 

306 issuer: str | Container[str] | None = None, 

307 leeway: float | timedelta = 0, 

308 # kwargs 

309 **kwargs: Any, 

310 ) -> dict[str, Any]: 

311 """Verify the ``jwt`` token signature and return the token claims. 

312 

313 :param jwt: the token to be decoded 

314 :type jwt: str or bytes 

315 :param key: the key suitable for the allowed algorithm 

316 :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` 

317 

318 :param algorithms: allowed algorithms, e.g. ``["ES256"]`` 

319 If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. 

320 

321 .. warning:: 

322 

323 Do **not** compute the ``algorithms`` parameter based on 

324 the ``alg`` from the token itself, or on any other data 

325 that an attacker may be able to influence, as that might 

326 expose you to various vulnerabilities (see `RFC 8725 §2.1 

327 <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead, 

328 either hard-code a fixed value for ``algorithms``, or 

329 configure it in the same place you configure the 

330 ``key``. Make sure not to mix symmetric and asymmetric 

331 algorithms that interpret the ``key`` in different ways 

332 (e.g. HS\\* and RS\\*). 

333 :type algorithms: typing.Sequence[str] or None 

334 

335 :param jwt.types.Options options: extended decoding and validation options 

336 Refer to :py:class:`jwt.types.Options` for more information. 

337 

338 :param audience: optional, the value for ``verify_aud`` check 

339 :type audience: str or typing.Iterable[str] or None 

340 :param subject: optional, the value for ``verify_sub`` check 

341 :type subject: str or None 

342 :param issuer: optional, the value for ``verify_iss`` check 

343 :type issuer: str or typing.Container[str] or None 

344 :param leeway: a time margin in seconds for the expiration check 

345 :type leeway: float or datetime.timedelta 

346 :rtype: dict[str, typing.Any] 

347 :returns: the JWT claims 

348 """ 

349 if kwargs: 

350 warnings.warn( 

351 "passing additional kwargs to decode() is deprecated " 

352 "and will be removed in pyjwt version 3. " 

353 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

354 RemovedInPyjwt3Warning, 

355 stacklevel=2, 

356 ) 

357 decoded = self.decode_complete( 

358 jwt, 

359 key, 

360 algorithms, 

361 options, 

362 verify=verify, 

363 detached_payload=detached_payload, 

364 audience=audience, 

365 subject=subject, 

366 issuer=issuer, 

367 leeway=leeway, 

368 ) 

369 return cast(dict[str, Any], decoded["payload"]) 

370 

371 def _validate_claims( 

372 self, 

373 payload: dict[str, Any], 

374 options: FullOptions, 

375 audience: Iterable[str] | str | None = None, 

376 issuer: Container[str] | str | None = None, 

377 subject: str | None = None, 

378 leeway: float | timedelta = 0, 

379 ) -> None: 

380 if isinstance(leeway, timedelta): 

381 leeway = leeway.total_seconds() 

382 

383 if audience is not None and not isinstance(audience, (str, Iterable)): 

384 raise TypeError("audience must be a string, iterable or None") 

385 

386 self._validate_required_claims(payload, options["require"]) 

387 

388 now = datetime.now(tz=timezone.utc).timestamp() 

389 

390 if "iat" in payload and options["verify_iat"]: 

391 self._validate_iat(payload, now, leeway) 

392 

393 if "nbf" in payload and options["verify_nbf"]: 

394 self._validate_nbf(payload, now, leeway) 

395 

396 if "exp" in payload and options["verify_exp"]: 

397 self._validate_exp(payload, now, leeway) 

398 

399 if options["verify_iss"]: 

400 self._validate_iss(payload, issuer) 

401 

402 if options["verify_aud"]: 

403 self._validate_aud( 

404 payload, audience, strict=options.get("strict_aud", False) 

405 ) 

406 

407 if options["verify_sub"]: 

408 self._validate_sub(payload, subject) 

409 

410 if options["verify_jti"]: 

411 self._validate_jti(payload) 

412 

413 def _validate_required_claims( 

414 self, 

415 payload: dict[str, Any], 

416 claims: Iterable[str], 

417 ) -> None: 

418 for claim in claims: 

419 if payload.get(claim) is None: 

420 raise MissingRequiredClaimError(claim) 

421 

422 def _validate_sub( 

423 self, payload: dict[str, Any], subject: str | None = None 

424 ) -> None: 

425 """ 

426 Checks whether "sub" if in the payload is valid or not. 

427 This is an Optional claim 

428 

429 :param payload(dict): The payload which needs to be validated 

430 :param subject(str): The subject of the token 

431 """ 

432 

433 if "sub" not in payload: 

434 return 

435 

436 if not isinstance(payload["sub"], str): 

437 raise InvalidSubjectError("Subject must be a string") 

438 

439 if subject is not None: 

440 if payload.get("sub") != subject: 

441 raise InvalidSubjectError("Invalid subject") 

442 

443 def _validate_jti(self, payload: dict[str, Any]) -> None: 

444 """ 

445 Checks whether "jti" if in the payload is valid or not 

446 This is an Optional claim 

447 

448 :param payload(dict): The payload which needs to be validated 

449 """ 

450 

451 if "jti" not in payload: 

452 return 

453 

454 if not isinstance(payload.get("jti"), str): 

455 raise InvalidJTIError("JWT ID must be a string") 

456 

457 def _validate_iat( 

458 self, 

459 payload: dict[str, Any], 

460 now: float, 

461 leeway: float, 

462 ) -> None: 

463 try: 

464 iat = int(payload["iat"]) 

465 except ValueError: 

466 raise InvalidIssuedAtError( 

467 "Issued At claim (iat) must be an integer." 

468 ) from None 

469 if iat > (now + leeway): 

470 raise ImmatureSignatureError("The token is not yet valid (iat)") 

471 

472 def _validate_nbf( 

473 self, 

474 payload: dict[str, Any], 

475 now: float, 

476 leeway: float, 

477 ) -> None: 

478 try: 

479 nbf = int(payload["nbf"]) 

480 except ValueError: 

481 raise DecodeError("Not Before claim (nbf) must be an integer.") from None 

482 

483 if nbf > (now + leeway): 

484 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

485 

486 def _validate_exp( 

487 self, 

488 payload: dict[str, Any], 

489 now: float, 

490 leeway: float, 

491 ) -> None: 

492 try: 

493 exp = int(payload["exp"]) 

494 except ValueError: 

495 raise DecodeError( 

496 "Expiration Time claim (exp) must be an integer." 

497 ) from None 

498 

499 if exp <= (now - leeway): 

500 raise ExpiredSignatureError("Signature has expired") 

501 

502 def _validate_aud( 

503 self, 

504 payload: dict[str, Any], 

505 audience: str | Iterable[str] | None, 

506 *, 

507 strict: bool = False, 

508 ) -> None: 

509 if audience is None: 

510 if "aud" not in payload or not payload["aud"]: 

511 return 

512 # Application did not specify an audience, but 

513 # the token has the 'aud' claim 

514 raise InvalidAudienceError("Invalid audience") 

515 

516 if "aud" not in payload or not payload["aud"]: 

517 # Application specified an audience, but it could not be 

518 # verified since the token does not contain a claim. 

519 raise MissingRequiredClaimError("aud") 

520 

521 audience_claims = payload["aud"] 

522 

523 # In strict mode, we forbid list matching: the supplied audience 

524 # must be a string, and it must exactly match the audience claim. 

525 if strict: 

526 # Only a single audience is allowed in strict mode. 

527 if not isinstance(audience, str): 

528 raise InvalidAudienceError("Invalid audience (strict)") 

529 

530 # Only a single audience claim is allowed in strict mode. 

531 if not isinstance(audience_claims, str): 

532 raise InvalidAudienceError("Invalid claim format in token (strict)") 

533 

534 if audience != audience_claims: 

535 raise InvalidAudienceError("Audience doesn't match (strict)") 

536 

537 return 

538 

539 if isinstance(audience_claims, str): 

540 audience_claims = [audience_claims] 

541 if not isinstance(audience_claims, list): 

542 raise InvalidAudienceError("Invalid claim format in token") 

543 if any(not isinstance(c, str) for c in audience_claims): 

544 raise InvalidAudienceError("Invalid claim format in token") 

545 

546 if isinstance(audience, str): 

547 audience = [audience] 

548 

549 if all(aud not in audience_claims for aud in audience): 

550 raise InvalidAudienceError("Audience doesn't match") 

551 

552 def _validate_iss( 

553 self, payload: dict[str, Any], issuer: Container[str] | str | None 

554 ) -> None: 

555 if issuer is None: 

556 return 

557 

558 if "iss" not in payload: 

559 raise MissingRequiredClaimError("iss") 

560 

561 iss = payload["iss"] 

562 if not isinstance(iss, str): 

563 raise InvalidIssuerError("Payload Issuer (iss) must be a string") 

564 

565 if isinstance(issuer, str): 

566 if iss != issuer: 

567 raise InvalidIssuerError("Invalid issuer") 

568 else: 

569 try: 

570 if iss not in issuer: 

571 raise InvalidIssuerError("Invalid issuer") 

572 except TypeError: 

573 raise InvalidIssuerError( 

574 'Issuer param must be "str" or "Container[str]"' 

575 ) from None 

576 

577 

578_jwt_global_obj = PyJWT() 

579encode = _jwt_global_obj.encode 

580decode_complete = _jwt_global_obj.decode_complete 

581decode = _jwt_global_obj.decode