Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask_jwt_extended/utils.py: 50%

68 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from typing import Any 

2from typing import Optional 

3 

4import jwt 

5from flask import g 

6from flask import Response 

7from werkzeug.local import LocalProxy 

8 

9from flask_jwt_extended.config import config 

10from flask_jwt_extended.internal_utils import get_jwt_manager 

11from flask_jwt_extended.typing import ExpiresDelta 

12from flask_jwt_extended.typing import Fresh 

13 

14# Proxy to access the current user 

15current_user: Any = LocalProxy(lambda: get_current_user()) 

16 

17 

18def get_jwt() -> dict: 

19 """ 

20 In a protected endpoint, this will return the python dictionary which has 

21 the payload of the JWT that is accessing the endpoint. If no JWT is present 

22 due to ``jwt_required(optional=True)``, an empty dictionary is returned. 

23 

24 :return: 

25 The payload (claims) of the JWT in the current request 

26 """ 

27 decoded_jwt = g.get("_jwt_extended_jwt", None) 

28 if decoded_jwt is None: 

29 raise RuntimeError( 

30 "You must call `@jwt_required()` or `verify_jwt_in_request()` " 

31 "before using this method" 

32 ) 

33 return decoded_jwt 

34 

35 

36def get_jwt_header() -> dict: 

37 """ 

38 In a protected endpoint, this will return the python dictionary which has 

39 the header of the JWT that is accessing the endpoint. If no JWT is present 

40 due to ``jwt_required(optional=True)``, an empty dictionary is returned. 

41 

42 :return: 

43 The headers of the JWT in the current request 

44 """ 

45 decoded_header = g.get("_jwt_extended_jwt_header", None) 

46 if decoded_header is None: 

47 raise RuntimeError( 

48 "You must call `@jwt_required()` or `verify_jwt_in_request()` " 

49 "before using this method" 

50 ) 

51 return decoded_header 

52 

53 

54def get_jwt_identity() -> Any: 

55 """ 

56 In a protected endpoint, this will return the identity of the JWT that is 

57 accessing the endpoint. If no JWT is present due to 

58 ``jwt_required(optional=True)``, ``None`` is returned. 

59 

60 :return: 

61 The identity of the JWT in the current request 

62 """ 

63 return get_jwt().get(config.identity_claim_key, None) 

64 

65 

66def get_jwt_request_location() -> Optional[str]: 

67 """ 

68 In a protected endpoint, this will return the "location" at which the JWT 

69 that is accessing the endpoint was found--e.g., "cookies", "query-string", 

70 "headers", or "json". If no JWT is present due to ``jwt_required(optional=True)``, 

71 None is returned. 

72 

73 :return: 

74 The location of the JWT in the current request; e.g., "cookies", 

75 "query-string", "headers", or "json" 

76 """ 

77 return g.get("_jwt_extended_jwt_location", None) 

78 

79 

80def get_current_user() -> Any: 

81 """ 

82 In a protected endpoint, this will return the user object for the JWT that 

83 is accessing the endpoint. 

84 

85 This is only usable if :meth:`~flask_jwt_extended.JWTManager.user_lookup_loader` 

86 is configured. If the user loader callback is not being used, this will 

87 raise an error. 

88 

89 If no JWT is present due to ``jwt_required(optional=True)``, ``None`` is returned. 

90 

91 :return: 

92 The current user object for the JWT in the current request 

93 """ 

94 get_jwt() # Raise an error if not in a decorated context 

95 jwt_user_dict = g.get("_jwt_extended_jwt_user", None) 

96 if jwt_user_dict is None: 

97 raise RuntimeError( 

98 "You must provide a `@jwt.user_lookup_loader` callback to use " 

99 "this method" 

100 ) 

101 return jwt_user_dict["loaded_user"] 

102 

103 

104def decode_token( 

105 encoded_token: str, csrf_value: Optional[str] = None, allow_expired: bool = False 

106) -> dict: 

107 """ 

108 Returns the decoded token (python dict) from an encoded JWT. This does all 

109 the checks to ensure that the decoded token is valid before returning it. 

110 

111 This will not fire the user loader callbacks, save the token for access 

112 in protected endpoints, checked if a token is revoked, etc. This is puerly 

113 used to ensure that a JWT is valid. 

114 

115 :param encoded_token: 

116 The encoded JWT to decode. 

117 

118 :param csrf_value: 

119 Expected CSRF double submit value (optional). 

120 

121 :param allow_expired: 

122 If ``True``, do not raise an error if the JWT is expired. Defaults to ``False`` 

123 

124 :return: 

125 Dictionary containing the payload of the JWT decoded JWT. 

126 """ 

127 jwt_manager = get_jwt_manager() 

128 return jwt_manager._decode_jwt_from_config(encoded_token, csrf_value, allow_expired) 

129 

130 

131def create_access_token( 

132 identity: Any, 

133 fresh: Fresh = False, 

134 expires_delta: Optional[ExpiresDelta] = None, 

135 additional_claims=None, 

136 additional_headers=None, 

137): 

138 """ 

139 Create a new access token. 

140 

141 :param identity: 

142 The identity of this token. It can be any data that is json serializable. 

143 You can use :meth:`~flask_jwt_extended.JWTManager.user_identity_loader` 

144 to define a callback function to convert any object passed in into a json 

145 serializable format. 

146 

147 :param fresh: 

148 If this token should be marked as fresh, and can thus access endpoints 

149 protected with ``@jwt_required(fresh=True)``. Defaults to ``False``. 

150 

151 This value can also be a ``datetime.timedelta``, which indicate 

152 how long this token will be considered fresh. 

153 

154 :param expires_delta: 

155 A ``datetime.timedelta`` for how long this token should last before it 

156 expires. Set to False to disable expiration. If this is None, it will use 

157 the ``JWT_ACCESS_TOKEN_EXPIRES`` config value (see :ref:`Configuration Options`) 

158 

159 :param additional_claims: 

160 Optional. A hash of claims to include in the access token. These claims are 

161 merged into the default claims (exp, iat, etc) and claims returned from the 

162 :meth:`~flask_jwt_extended.JWTManager.additional_claims_loader` callback. 

163 On conflict, these claims take precedence. 

164 

165 :param headers: 

166 Optional. A hash of headers to include in the access token. These headers 

167 are merged into the default headers (alg, typ) and headers returned from 

168 the :meth:`~flask_jwt_extended.JWTManager.additional_headers_loader` 

169 callback. On conflict, these headers take precedence. 

170 

171 :return: 

172 An encoded access token 

173 """ 

174 jwt_manager = get_jwt_manager() 

175 return jwt_manager._encode_jwt_from_config( 

176 claims=additional_claims, 

177 expires_delta=expires_delta, 

178 fresh=fresh, 

179 headers=additional_headers, 

180 identity=identity, 

181 token_type="access", 

182 ) 

183 

184 

185def create_refresh_token( 

186 identity: Any, 

187 expires_delta: Optional[ExpiresDelta] = None, 

188 additional_claims=None, 

189 additional_headers=None, 

190): 

191 """ 

192 Create a new refresh token. 

193 

194 :param identity: 

195 The identity of this token. It can be any data that is json serializable. 

196 You can use :meth:`~flask_jwt_extended.JWTManager.user_identity_loader` 

197 to define a callback function to convert any object passed in into a json 

198 serializable format. 

199 

200 :param expires_delta: 

201 A ``datetime.timedelta`` for how long this token should last before it expires. 

202 Set to False to disable expiration. If this is None, it will use the 

203 ``JWT_REFRESH_TOKEN_EXPIRES`` config value (see :ref:`Configuration Options`) 

204 

205 :param additional_claims: 

206 Optional. A hash of claims to include in the refresh token. These claims are 

207 merged into the default claims (exp, iat, etc) and claims returned from the 

208 :meth:`~flask_jwt_extended.JWTManager.additional_claims_loader` callback. 

209 On conflict, these claims take precedence. 

210 

211 :param headers: 

212 Optional. A hash of headers to include in the refresh token. These headers 

213 are merged into the default headers (alg, typ) and headers returned from the 

214 :meth:`~flask_jwt_extended.JWTManager.additional_headers_loader` callback. 

215 On conflict, these headers take precedence. 

216 

217 :return: 

218 An encoded refresh token 

219 """ 

220 jwt_manager = get_jwt_manager() 

221 return jwt_manager._encode_jwt_from_config( 

222 claims=additional_claims, 

223 expires_delta=expires_delta, 

224 fresh=False, 

225 headers=additional_headers, 

226 identity=identity, 

227 token_type="refresh", 

228 ) 

229 

230 

231def get_unverified_jwt_headers(encoded_token: str) -> dict: 

232 """ 

233 Returns the Headers of an encoded JWT without verifying the signature of the JWT. 

234 

235 :param encoded_token: 

236 The encoded JWT to get the Header from. 

237 

238 :return: 

239 JWT header parameters as python dict() 

240 """ 

241 return jwt.get_unverified_header(encoded_token) 

242 

243 

244def get_jti(encoded_token: str) -> Optional[str]: 

245 """ 

246 Returns the JTI (unique identifier) of an encoded JWT 

247 

248 :param encoded_token: 

249 The encoded JWT to get the JTI from. 

250 

251 :return: 

252 The JTI (unique identifier) of a JWT, if it is present. 

253 """ 

254 return decode_token(encoded_token).get("jti") 

255 

256 

257def get_csrf_token(encoded_token: str) -> str: 

258 """ 

259 Returns the CSRF double submit token from an encoded JWT. 

260 

261 :param encoded_token: 

262 The encoded JWT 

263 

264 :return: 

265 The CSRF double submit token (string) 

266 """ 

267 token = decode_token(encoded_token) 

268 return token["csrf"] 

269 

270 

271def set_access_cookies( 

272 response: Response, encoded_access_token: str, max_age=None, domain=None 

273) -> None: 

274 """ 

275 Modifiy a Flask Response to set a cookie containing the access JWT. 

276 Also sets the corresponding CSRF cookies if ``JWT_CSRF_IN_COOKIES`` is ``True`` 

277 (see :ref:`Configuration Options`) 

278 

279 :param response: 

280 A Flask Response object. 

281 

282 :param encoded_access_token: 

283 The encoded access token to set in the cookies. 

284 

285 :param max_age: 

286 The max age of the cookie. If this is None, it will use the 

287 ``JWT_SESSION_COOKIE`` option (see :ref:`Configuration Options`). Otherwise, 

288 it will use this as the cookies ``max-age`` and the JWT_SESSION_COOKIE option 

289 will be ignored. Values should be the number of seconds (as an integer). 

290 

291 :param domain: 

292 The domain of the cookie. If this is None, it will use the 

293 ``JWT_COOKIE_DOMAIN`` option (see :ref:`Configuration Options`). Otherwise, 

294 it will use this as the cookies ``domain`` and the JWT_COOKIE_DOMAIN option 

295 will be ignored. 

296 """ 

297 response.set_cookie( 

298 config.access_cookie_name, 

299 value=encoded_access_token, 

300 max_age=max_age or config.cookie_max_age, 

301 secure=config.cookie_secure, 

302 httponly=True, 

303 domain=domain or config.cookie_domain, 

304 path=config.access_cookie_path, 

305 samesite=config.cookie_samesite, 

306 ) 

307 

308 if config.csrf_protect and config.csrf_in_cookies: 

309 response.set_cookie( 

310 config.access_csrf_cookie_name, 

311 value=get_csrf_token(encoded_access_token), 

312 max_age=max_age or config.cookie_max_age, 

313 secure=config.cookie_secure, 

314 httponly=False, 

315 domain=domain or config.cookie_domain, 

316 path=config.access_csrf_cookie_path, 

317 samesite=config.cookie_samesite, 

318 ) 

319 

320 

321def set_refresh_cookies( 

322 response: Response, 

323 encoded_refresh_token: str, 

324 max_age: Optional[int] = None, 

325 domain: Optional[str] = None, 

326) -> None: 

327 """ 

328 Modifiy a Flask Response to set a cookie containing the refresh JWT. 

329 Also sets the corresponding CSRF cookies if ``JWT_CSRF_IN_COOKIES`` is ``True`` 

330 (see :ref:`Configuration Options`) 

331 

332 :param response: 

333 A Flask Response object. 

334 

335 :param encoded_refresh_token: 

336 The encoded refresh token to set in the cookies. 

337 

338 :param max_age: 

339 The max age of the cookie. If this is None, it will use the 

340 ``JWT_SESSION_COOKIE`` option (see :ref:`Configuration Options`). Otherwise, 

341 it will use this as the cookies ``max-age`` and the JWT_SESSION_COOKIE option 

342 will be ignored. Values should be the number of seconds (as an integer). 

343 

344 :param domain: 

345 The domain of the cookie. If this is None, it will use the 

346 ``JWT_COOKIE_DOMAIN`` option (see :ref:`Configuration Options`). Otherwise, 

347 it will use this as the cookies ``domain`` and the JWT_COOKIE_DOMAIN option 

348 will be ignored. 

349 """ 

350 response.set_cookie( 

351 config.refresh_cookie_name, 

352 value=encoded_refresh_token, 

353 max_age=max_age or config.cookie_max_age, 

354 secure=config.cookie_secure, 

355 httponly=True, 

356 domain=domain or config.cookie_domain, 

357 path=config.refresh_cookie_path, 

358 samesite=config.cookie_samesite, 

359 ) 

360 

361 if config.csrf_protect and config.csrf_in_cookies: 

362 response.set_cookie( 

363 config.refresh_csrf_cookie_name, 

364 value=get_csrf_token(encoded_refresh_token), 

365 max_age=max_age or config.cookie_max_age, 

366 secure=config.cookie_secure, 

367 httponly=False, 

368 domain=domain or config.cookie_domain, 

369 path=config.refresh_csrf_cookie_path, 

370 samesite=config.cookie_samesite, 

371 ) 

372 

373 

374def unset_jwt_cookies(response: Response, domain: Optional[str] = None) -> None: 

375 """ 

376 Modifiy a Flask Response to delete the cookies containing access or refresh 

377 JWTs. Also deletes the corresponding CSRF cookies if applicable. 

378 

379 :param response: 

380 A Flask Response object 

381 """ 

382 unset_access_cookies(response, domain) 

383 unset_refresh_cookies(response, domain) 

384 

385 

386def unset_access_cookies(response: Response, domain: Optional[str] = None) -> None: 

387 """ 

388 Modifiy a Flask Response to delete the cookie containing an access JWT. 

389 Also deletes the corresponding CSRF cookie if applicable. 

390 

391 :param response: 

392 A Flask Response object 

393 

394 :param domain: 

395 The domain of the cookie. If this is None, it will use the 

396 ``JWT_COOKIE_DOMAIN`` option (see :ref:`Configuration Options`). Otherwise, 

397 it will use this as the cookies ``domain`` and the JWT_COOKIE_DOMAIN option 

398 will be ignored. 

399 """ 

400 response.set_cookie( 

401 config.access_cookie_name, 

402 value="", 

403 expires=0, 

404 secure=config.cookie_secure, 

405 httponly=True, 

406 domain=domain or config.cookie_domain, 

407 path=config.access_cookie_path, 

408 samesite=config.cookie_samesite, 

409 ) 

410 

411 if config.csrf_protect and config.csrf_in_cookies: 

412 response.set_cookie( 

413 config.access_csrf_cookie_name, 

414 value="", 

415 expires=0, 

416 secure=config.cookie_secure, 

417 httponly=False, 

418 domain=domain or config.cookie_domain, 

419 path=config.access_csrf_cookie_path, 

420 samesite=config.cookie_samesite, 

421 ) 

422 

423 

424def unset_refresh_cookies(response: Response, domain: Optional[str] = None) -> None: 

425 """ 

426 Modifiy a Flask Response to delete the cookie containing a refresh JWT. 

427 Also deletes the corresponding CSRF cookie if applicable. 

428 

429 :param response: 

430 A Flask Response object 

431 

432 :param domain: 

433 The domain of the cookie. If this is None, it will use the 

434 ``JWT_COOKIE_DOMAIN`` option (see :ref:`Configuration Options`). Otherwise, 

435 it will use this as the cookies ``domain`` and the JWT_COOKIE_DOMAIN option 

436 will be ignored. 

437 """ 

438 response.set_cookie( 

439 config.refresh_cookie_name, 

440 value="", 

441 expires=0, 

442 secure=config.cookie_secure, 

443 httponly=True, 

444 domain=domain or config.cookie_domain, 

445 path=config.refresh_cookie_path, 

446 samesite=config.cookie_samesite, 

447 ) 

448 

449 if config.csrf_protect and config.csrf_in_cookies: 

450 response.set_cookie( 

451 config.refresh_csrf_cookie_name, 

452 value="", 

453 expires=0, 

454 secure=config.cookie_secure, 

455 httponly=False, 

456 domain=domain or config.cookie_domain, 

457 path=config.refresh_csrf_cookie_path, 

458 samesite=config.cookie_samesite, 

459 ) 

460 

461 

462def current_user_context_processor() -> Any: 

463 return {"current_user": get_current_user()}