Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jwt/api_jwt.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

148 statements  

1from __future__ import annotations 

2 

3import json 

4import warnings 

5from calendar import timegm 

6from collections.abc import Iterable 

7from datetime import datetime, timedelta, timezone 

8from typing import TYPE_CHECKING, Any, List 

9 

10from . import api_jws 

11from .exceptions import ( 

12 DecodeError, 

13 ExpiredSignatureError, 

14 ImmatureSignatureError, 

15 InvalidAudienceError, 

16 InvalidIssuedAtError, 

17 InvalidIssuerError, 

18 MissingRequiredClaimError, 

19) 

20from .warnings import RemovedInPyjwt3Warning 

21 

22if TYPE_CHECKING: 

23 from .algorithms import AllowedPrivateKeys, AllowedPublicKeys 

24 from .api_jwk import PyJWK 

25 

26 

27class PyJWT: 

28 def __init__(self, options: dict[str, Any] | None = None) -> None: 

29 if options is None: 

30 options = {} 

31 self.options: dict[str, Any] = {**self._get_default_options(), **options} 

32 

33 @staticmethod 

34 def _get_default_options() -> dict[str, bool | list[str]]: 

35 return { 

36 "verify_signature": True, 

37 "verify_exp": True, 

38 "verify_nbf": True, 

39 "verify_iat": True, 

40 "verify_aud": True, 

41 "verify_iss": True, 

42 "require": [], 

43 } 

44 

45 def encode( 

46 self, 

47 payload: dict[str, Any], 

48 key: AllowedPrivateKeys | str | bytes, 

49 algorithm: str | None = "HS256", 

50 headers: dict[str, Any] | None = None, 

51 json_encoder: type[json.JSONEncoder] | None = None, 

52 sort_headers: bool = True, 

53 ) -> str: 

54 # Check that we get a dict 

55 if not isinstance(payload, dict): 

56 raise TypeError( 

57 "Expecting a dict object, as JWT only supports " 

58 "JSON objects as payloads." 

59 ) 

60 

61 # Payload 

62 payload = payload.copy() 

63 for time_claim in ["exp", "iat", "nbf"]: 

64 # Convert datetime to a intDate value in known time-format claims 

65 if isinstance(payload.get(time_claim), datetime): 

66 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

67 

68 json_payload = self._encode_payload( 

69 payload, 

70 headers=headers, 

71 json_encoder=json_encoder, 

72 ) 

73 

74 return api_jws.encode( 

75 json_payload, 

76 key, 

77 algorithm, 

78 headers, 

79 json_encoder, 

80 sort_headers=sort_headers, 

81 ) 

82 

83 def _encode_payload( 

84 self, 

85 payload: dict[str, Any], 

86 headers: dict[str, Any] | None = None, 

87 json_encoder: type[json.JSONEncoder] | None = None, 

88 ) -> bytes: 

89 """ 

90 Encode a given payload to the bytes to be signed. 

91 

92 This method is intended to be overridden by subclasses that need to 

93 encode the payload in a different way, e.g. compress the payload. 

94 """ 

95 return json.dumps( 

96 payload, 

97 separators=(",", ":"), 

98 cls=json_encoder, 

99 ).encode("utf-8") 

100 

101 def decode_complete( 

102 self, 

103 jwt: str | bytes, 

104 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

105 algorithms: list[str] | None = None, 

106 options: dict[str, Any] | None = None, 

107 # deprecated arg, remove in pyjwt3 

108 verify: bool | None = None, 

109 # could be used as passthrough to api_jws, consider removal in pyjwt3 

110 detached_payload: bytes | None = None, 

111 # passthrough arguments to _validate_claims 

112 # consider putting in options 

113 audience: str | Iterable[str] | None = None, 

114 issuer: str | List[str] | None = None, 

115 leeway: float | timedelta = 0, 

116 # kwargs 

117 **kwargs: Any, 

118 ) -> dict[str, Any]: 

119 if kwargs: 

120 warnings.warn( 

121 "passing additional kwargs to decode_complete() is deprecated " 

122 "and will be removed in pyjwt version 3. " 

123 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

124 RemovedInPyjwt3Warning, 

125 ) 

126 options = dict(options or {}) # shallow-copy or initialize an empty dict 

127 options.setdefault("verify_signature", True) 

128 

129 # If the user has set the legacy `verify` argument, and it doesn't match 

130 # what the relevant `options` entry for the argument is, inform the user 

131 # that they're likely making a mistake. 

132 if verify is not None and verify != options["verify_signature"]: 

133 warnings.warn( 

134 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

135 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

136 "This invocation has a mismatch between the kwarg and the option entry.", 

137 category=DeprecationWarning, 

138 ) 

139 

140 if not options["verify_signature"]: 

141 options.setdefault("verify_exp", False) 

142 options.setdefault("verify_nbf", False) 

143 options.setdefault("verify_iat", False) 

144 options.setdefault("verify_aud", False) 

145 options.setdefault("verify_iss", False) 

146 

147 if options["verify_signature"] and not algorithms: 

148 raise DecodeError( 

149 'It is required that you pass in a value for the "algorithms" argument when calling decode().' 

150 ) 

151 

152 decoded = api_jws.decode_complete( 

153 jwt, 

154 key=key, 

155 algorithms=algorithms, 

156 options=options, 

157 detached_payload=detached_payload, 

158 ) 

159 

160 payload = self._decode_payload(decoded) 

161 

162 merged_options = {**self.options, **options} 

163 self._validate_claims( 

164 payload, merged_options, audience=audience, issuer=issuer, leeway=leeway 

165 ) 

166 

167 decoded["payload"] = payload 

168 return decoded 

169 

170 def _decode_payload(self, decoded: dict[str, Any]) -> Any: 

171 """ 

172 Decode the payload from a JWS dictionary (payload, signature, header). 

173 

174 This method is intended to be overridden by subclasses that need to 

175 decode the payload in a different way, e.g. decompress compressed 

176 payloads. 

177 """ 

178 try: 

179 payload = json.loads(decoded["payload"]) 

180 except ValueError as e: 

181 raise DecodeError(f"Invalid payload string: {e}") 

182 if not isinstance(payload, dict): 

183 raise DecodeError("Invalid payload string: must be a json object") 

184 return payload 

185 

186 def decode( 

187 self, 

188 jwt: str | bytes, 

189 key: AllowedPublicKeys | PyJWK | str | bytes = "", 

190 algorithms: list[str] | None = None, 

191 options: dict[str, Any] | None = None, 

192 # deprecated arg, remove in pyjwt3 

193 verify: bool | None = None, 

194 # could be used as passthrough to api_jws, consider removal in pyjwt3 

195 detached_payload: bytes | None = None, 

196 # passthrough arguments to _validate_claims 

197 # consider putting in options 

198 audience: str | Iterable[str] | None = None, 

199 issuer: str | List[str] | None = None, 

200 leeway: float | timedelta = 0, 

201 # kwargs 

202 **kwargs: Any, 

203 ) -> Any: 

204 if kwargs: 

205 warnings.warn( 

206 "passing additional kwargs to decode() is deprecated " 

207 "and will be removed in pyjwt version 3. " 

208 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

209 RemovedInPyjwt3Warning, 

210 ) 

211 decoded = self.decode_complete( 

212 jwt, 

213 key, 

214 algorithms, 

215 options, 

216 verify=verify, 

217 detached_payload=detached_payload, 

218 audience=audience, 

219 issuer=issuer, 

220 leeway=leeway, 

221 ) 

222 return decoded["payload"] 

223 

224 def _validate_claims( 

225 self, 

226 payload: dict[str, Any], 

227 options: dict[str, Any], 

228 audience=None, 

229 issuer=None, 

230 leeway: float | timedelta = 0, 

231 ) -> None: 

232 if isinstance(leeway, timedelta): 

233 leeway = leeway.total_seconds() 

234 

235 if audience is not None and not isinstance(audience, (str, Iterable)): 

236 raise TypeError("audience must be a string, iterable or None") 

237 

238 self._validate_required_claims(payload, options) 

239 

240 now = datetime.now(tz=timezone.utc).timestamp() 

241 

242 if "iat" in payload and options["verify_iat"]: 

243 self._validate_iat(payload, now, leeway) 

244 

245 if "nbf" in payload and options["verify_nbf"]: 

246 self._validate_nbf(payload, now, leeway) 

247 

248 if "exp" in payload and options["verify_exp"]: 

249 self._validate_exp(payload, now, leeway) 

250 

251 if options["verify_iss"]: 

252 self._validate_iss(payload, issuer) 

253 

254 if options["verify_aud"]: 

255 self._validate_aud( 

256 payload, audience, strict=options.get("strict_aud", False) 

257 ) 

258 

259 def _validate_required_claims( 

260 self, 

261 payload: dict[str, Any], 

262 options: dict[str, Any], 

263 ) -> None: 

264 for claim in options["require"]: 

265 if payload.get(claim) is None: 

266 raise MissingRequiredClaimError(claim) 

267 

268 def _validate_iat( 

269 self, 

270 payload: dict[str, Any], 

271 now: float, 

272 leeway: float, 

273 ) -> None: 

274 try: 

275 iat = int(payload["iat"]) 

276 except ValueError: 

277 raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.") 

278 if iat > (now + leeway): 

279 raise ImmatureSignatureError("The token is not yet valid (iat)") 

280 

281 def _validate_nbf( 

282 self, 

283 payload: dict[str, Any], 

284 now: float, 

285 leeway: float, 

286 ) -> None: 

287 try: 

288 nbf = int(payload["nbf"]) 

289 except ValueError: 

290 raise DecodeError("Not Before claim (nbf) must be an integer.") 

291 

292 if nbf > (now + leeway): 

293 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

294 

295 def _validate_exp( 

296 self, 

297 payload: dict[str, Any], 

298 now: float, 

299 leeway: float, 

300 ) -> None: 

301 try: 

302 exp = int(payload["exp"]) 

303 except ValueError: 

304 raise DecodeError("Expiration Time claim (exp) must be an integer.") 

305 

306 if exp <= (now - leeway): 

307 raise ExpiredSignatureError("Signature has expired") 

308 

309 def _validate_aud( 

310 self, 

311 payload: dict[str, Any], 

312 audience: str | Iterable[str] | None, 

313 *, 

314 strict: bool = False, 

315 ) -> None: 

316 if audience is None: 

317 if "aud" not in payload or not payload["aud"]: 

318 return 

319 # Application did not specify an audience, but 

320 # the token has the 'aud' claim 

321 raise InvalidAudienceError("Invalid audience") 

322 

323 if "aud" not in payload or not payload["aud"]: 

324 # Application specified an audience, but it could not be 

325 # verified since the token does not contain a claim. 

326 raise MissingRequiredClaimError("aud") 

327 

328 audience_claims = payload["aud"] 

329 

330 # In strict mode, we forbid list matching: the supplied audience 

331 # must be a string, and it must exactly match the audience claim. 

332 if strict: 

333 # Only a single audience is allowed in strict mode. 

334 if not isinstance(audience, str): 

335 raise InvalidAudienceError("Invalid audience (strict)") 

336 

337 # Only a single audience claim is allowed in strict mode. 

338 if not isinstance(audience_claims, str): 

339 raise InvalidAudienceError("Invalid claim format in token (strict)") 

340 

341 if audience != audience_claims: 

342 raise InvalidAudienceError("Audience doesn't match (strict)") 

343 

344 return 

345 

346 if isinstance(audience_claims, str): 

347 audience_claims = [audience_claims] 

348 if not isinstance(audience_claims, list): 

349 raise InvalidAudienceError("Invalid claim format in token") 

350 if any(not isinstance(c, str) for c in audience_claims): 

351 raise InvalidAudienceError("Invalid claim format in token") 

352 

353 if isinstance(audience, str): 

354 audience = [audience] 

355 

356 if all(aud not in audience_claims for aud in audience): 

357 raise InvalidAudienceError("Audience doesn't match") 

358 

359 def _validate_iss(self, payload: dict[str, Any], issuer: Any) -> None: 

360 if issuer is None: 

361 return 

362 

363 if "iss" not in payload: 

364 raise MissingRequiredClaimError("iss") 

365 

366 if isinstance(issuer, list): 

367 if payload["iss"] not in issuer: 

368 raise InvalidIssuerError("Invalid issuer") 

369 else: 

370 if payload["iss"] != issuer: 

371 raise InvalidIssuerError("Invalid issuer") 

372 

373 

374_jwt_global_obj = PyJWT() 

375encode = _jwt_global_obj.encode 

376decode_complete = _jwt_global_obj.decode_complete 

377decode = _jwt_global_obj.decode