Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask/helpers.py: 33%

116 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from __future__ import annotations 

2 

3import importlib.util 

4import os 

5import sys 

6import typing as t 

7from datetime import datetime 

8from functools import lru_cache 

9from functools import update_wrapper 

10 

11import werkzeug.utils 

12from werkzeug.exceptions import abort as _wz_abort 

13from werkzeug.utils import redirect as _wz_redirect 

14 

15from .globals import _cv_request 

16from .globals import current_app 

17from .globals import request 

18from .globals import request_ctx 

19from .globals import session 

20from .signals import message_flashed 

21 

22if t.TYPE_CHECKING: # pragma: no cover 

23 from werkzeug.wrappers import Response as BaseResponse 

24 from .wrappers import Response 

25 

26 

27def get_debug_flag() -> bool: 

28 """Get whether debug mode should be enabled for the app, indicated by the 

29 :envvar:`FLASK_DEBUG` environment variable. The default is ``False``. 

30 """ 

31 val = os.environ.get("FLASK_DEBUG") 

32 return bool(val and val.lower() not in {"0", "false", "no"}) 

33 

34 

35def get_load_dotenv(default: bool = True) -> bool: 

36 """Get whether the user has disabled loading default dotenv files by 

37 setting :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load 

38 the files. 

39 

40 :param default: What to return if the env var isn't set. 

41 """ 

42 val = os.environ.get("FLASK_SKIP_DOTENV") 

43 

44 if not val: 

45 return default 

46 

47 return val.lower() in ("0", "false", "no") 

48 

49 

50def stream_with_context( 

51 generator_or_function: ( 

52 t.Iterator[t.AnyStr] | t.Callable[..., t.Iterator[t.AnyStr]] 

53 ) 

54) -> t.Iterator[t.AnyStr]: 

55 """Request contexts disappear when the response is started on the server. 

56 This is done for efficiency reasons and to make it less likely to encounter 

57 memory leaks with badly written WSGI middlewares. The downside is that if 

58 you are using streamed responses, the generator cannot access request bound 

59 information any more. 

60 

61 This function however can help you keep the context around for longer:: 

62 

63 from flask import stream_with_context, request, Response 

64 

65 @app.route('/stream') 

66 def streamed_response(): 

67 @stream_with_context 

68 def generate(): 

69 yield 'Hello ' 

70 yield request.args['name'] 

71 yield '!' 

72 return Response(generate()) 

73 

74 Alternatively it can also be used around a specific generator:: 

75 

76 from flask import stream_with_context, request, Response 

77 

78 @app.route('/stream') 

79 def streamed_response(): 

80 def generate(): 

81 yield 'Hello ' 

82 yield request.args['name'] 

83 yield '!' 

84 return Response(stream_with_context(generate())) 

85 

86 .. versionadded:: 0.9 

87 """ 

88 try: 

89 gen = iter(generator_or_function) # type: ignore 

90 except TypeError: 

91 

92 def decorator(*args: t.Any, **kwargs: t.Any) -> t.Any: 

93 gen = generator_or_function(*args, **kwargs) # type: ignore 

94 return stream_with_context(gen) 

95 

96 return update_wrapper(decorator, generator_or_function) # type: ignore 

97 

98 def generator() -> t.Generator: 

99 ctx = _cv_request.get(None) 

100 if ctx is None: 

101 raise RuntimeError( 

102 "'stream_with_context' can only be used when a request" 

103 " context is active, such as in a view function." 

104 ) 

105 with ctx: 

106 # Dummy sentinel. Has to be inside the context block or we're 

107 # not actually keeping the context around. 

108 yield None 

109 

110 # The try/finally is here so that if someone passes a WSGI level 

111 # iterator in we're still running the cleanup logic. Generators 

112 # don't need that because they are closed on their destruction 

113 # automatically. 

114 try: 

115 yield from gen 

116 finally: 

117 if hasattr(gen, "close"): 

118 gen.close() 

119 

120 # The trick is to start the generator. Then the code execution runs until 

121 # the first dummy None is yielded at which point the context was already 

122 # pushed. This item is discarded. Then when the iteration continues the 

123 # real generator is executed. 

124 wrapped_g = generator() 

125 next(wrapped_g) 

126 return wrapped_g 

127 

128 

129def make_response(*args: t.Any) -> Response: 

130 """Sometimes it is necessary to set additional headers in a view. Because 

131 views do not have to return response objects but can return a value that 

132 is converted into a response object by Flask itself, it becomes tricky to 

133 add headers to it. This function can be called instead of using a return 

134 and you will get a response object which you can use to attach headers. 

135 

136 If view looked like this and you want to add a new header:: 

137 

138 def index(): 

139 return render_template('index.html', foo=42) 

140 

141 You can now do something like this:: 

142 

143 def index(): 

144 response = make_response(render_template('index.html', foo=42)) 

145 response.headers['X-Parachutes'] = 'parachutes are cool' 

146 return response 

147 

148 This function accepts the very same arguments you can return from a 

149 view function. This for example creates a response with a 404 error 

150 code:: 

151 

152 response = make_response(render_template('not_found.html'), 404) 

153 

154 The other use case of this function is to force the return value of a 

155 view function into a response which is helpful with view 

156 decorators:: 

157 

158 response = make_response(view_function()) 

159 response.headers['X-Parachutes'] = 'parachutes are cool' 

160 

161 Internally this function does the following things: 

162 

163 - if no arguments are passed, it creates a new response argument 

164 - if one argument is passed, :meth:`flask.Flask.make_response` 

165 is invoked with it. 

166 - if more than one argument is passed, the arguments are passed 

167 to the :meth:`flask.Flask.make_response` function as tuple. 

168 

169 .. versionadded:: 0.6 

170 """ 

171 if not args: 

172 return current_app.response_class() 

173 if len(args) == 1: 

174 args = args[0] 

175 return current_app.make_response(args) # type: ignore 

176 

177 

178def url_for( 

179 endpoint: str, 

180 *, 

181 _anchor: str | None = None, 

182 _method: str | None = None, 

183 _scheme: str | None = None, 

184 _external: bool | None = None, 

185 **values: t.Any, 

186) -> str: 

187 """Generate a URL to the given endpoint with the given values. 

188 

189 This requires an active request or application context, and calls 

190 :meth:`current_app.url_for() <flask.Flask.url_for>`. See that method 

191 for full documentation. 

192 

193 :param endpoint: The endpoint name associated with the URL to 

194 generate. If this starts with a ``.``, the current blueprint 

195 name (if any) will be used. 

196 :param _anchor: If given, append this as ``#anchor`` to the URL. 

197 :param _method: If given, generate the URL associated with this 

198 method for the endpoint. 

199 :param _scheme: If given, the URL will have this scheme if it is 

200 external. 

201 :param _external: If given, prefer the URL to be internal (False) or 

202 require it to be external (True). External URLs include the 

203 scheme and domain. When not in an active request, URLs are 

204 external by default. 

205 :param values: Values to use for the variable parts of the URL rule. 

206 Unknown keys are appended as query string arguments, like 

207 ``?a=b&c=d``. 

208 

209 .. versionchanged:: 2.2 

210 Calls ``current_app.url_for``, allowing an app to override the 

211 behavior. 

212 

213 .. versionchanged:: 0.10 

214 The ``_scheme`` parameter was added. 

215 

216 .. versionchanged:: 0.9 

217 The ``_anchor`` and ``_method`` parameters were added. 

218 

219 .. versionchanged:: 0.9 

220 Calls ``app.handle_url_build_error`` on build errors. 

221 """ 

222 return current_app.url_for( 

223 endpoint, 

224 _anchor=_anchor, 

225 _method=_method, 

226 _scheme=_scheme, 

227 _external=_external, 

228 **values, 

229 ) 

230 

231 

232def redirect( 

233 location: str, code: int = 302, Response: type[BaseResponse] | None = None 

234) -> BaseResponse: 

235 """Create a redirect response object. 

236 

237 If :data:`~flask.current_app` is available, it will use its 

238 :meth:`~flask.Flask.redirect` method, otherwise it will use 

239 :func:`werkzeug.utils.redirect`. 

240 

241 :param location: The URL to redirect to. 

242 :param code: The status code for the redirect. 

243 :param Response: The response class to use. Not used when 

244 ``current_app`` is active, which uses ``app.response_class``. 

245 

246 .. versionadded:: 2.2 

247 Calls ``current_app.redirect`` if available instead of always 

248 using Werkzeug's default ``redirect``. 

249 """ 

250 if current_app: 

251 return current_app.redirect(location, code=code) 

252 

253 return _wz_redirect(location, code=code, Response=Response) 

254 

255 

256def abort(code: int | BaseResponse, *args: t.Any, **kwargs: t.Any) -> t.NoReturn: 

257 """Raise an :exc:`~werkzeug.exceptions.HTTPException` for the given 

258 status code. 

259 

260 If :data:`~flask.current_app` is available, it will call its 

261 :attr:`~flask.Flask.aborter` object, otherwise it will use 

262 :func:`werkzeug.exceptions.abort`. 

263 

264 :param code: The status code for the exception, which must be 

265 registered in ``app.aborter``. 

266 :param args: Passed to the exception. 

267 :param kwargs: Passed to the exception. 

268 

269 .. versionadded:: 2.2 

270 Calls ``current_app.aborter`` if available instead of always 

271 using Werkzeug's default ``abort``. 

272 """ 

273 if current_app: 

274 current_app.aborter(code, *args, **kwargs) 

275 

276 _wz_abort(code, *args, **kwargs) 

277 

278 

279def get_template_attribute(template_name: str, attribute: str) -> t.Any: 

280 """Loads a macro (or variable) a template exports. This can be used to 

281 invoke a macro from within Python code. If you for example have a 

282 template named :file:`_cider.html` with the following contents: 

283 

284 .. sourcecode:: html+jinja 

285 

286 {% macro hello(name) %}Hello {{ name }}!{% endmacro %} 

287 

288 You can access this from Python code like this:: 

289 

290 hello = get_template_attribute('_cider.html', 'hello') 

291 return hello('World') 

292 

293 .. versionadded:: 0.2 

294 

295 :param template_name: the name of the template 

296 :param attribute: the name of the variable of macro to access 

297 """ 

298 return getattr(current_app.jinja_env.get_template(template_name).module, attribute) 

299 

300 

301def flash(message: str, category: str = "message") -> None: 

302 """Flashes a message to the next request. In order to remove the 

303 flashed message from the session and to display it to the user, 

304 the template has to call :func:`get_flashed_messages`. 

305 

306 .. versionchanged:: 0.3 

307 `category` parameter added. 

308 

309 :param message: the message to be flashed. 

310 :param category: the category for the message. The following values 

311 are recommended: ``'message'`` for any kind of message, 

312 ``'error'`` for errors, ``'info'`` for information 

313 messages and ``'warning'`` for warnings. However any 

314 kind of string can be used as category. 

315 """ 

316 # Original implementation: 

317 # 

318 # session.setdefault('_flashes', []).append((category, message)) 

319 # 

320 # This assumed that changes made to mutable structures in the session are 

321 # always in sync with the session object, which is not true for session 

322 # implementations that use external storage for keeping their keys/values. 

323 flashes = session.get("_flashes", []) 

324 flashes.append((category, message)) 

325 session["_flashes"] = flashes 

326 app = current_app._get_current_object() # type: ignore 

327 message_flashed.send( 

328 app, 

329 _async_wrapper=app.ensure_sync, 

330 message=message, 

331 category=category, 

332 ) 

333 

334 

335def get_flashed_messages( 

336 with_categories: bool = False, category_filter: t.Iterable[str] = () 

337) -> list[str] | list[tuple[str, str]]: 

338 """Pulls all flashed messages from the session and returns them. 

339 Further calls in the same request to the function will return 

340 the same messages. By default just the messages are returned, 

341 but when `with_categories` is set to ``True``, the return value will 

342 be a list of tuples in the form ``(category, message)`` instead. 

343 

344 Filter the flashed messages to one or more categories by providing those 

345 categories in `category_filter`. This allows rendering categories in 

346 separate html blocks. The `with_categories` and `category_filter` 

347 arguments are distinct: 

348 

349 * `with_categories` controls whether categories are returned with message 

350 text (``True`` gives a tuple, where ``False`` gives just the message text). 

351 * `category_filter` filters the messages down to only those matching the 

352 provided categories. 

353 

354 See :doc:`/patterns/flashing` for examples. 

355 

356 .. versionchanged:: 0.3 

357 `with_categories` parameter added. 

358 

359 .. versionchanged:: 0.9 

360 `category_filter` parameter added. 

361 

362 :param with_categories: set to ``True`` to also receive categories. 

363 :param category_filter: filter of categories to limit return values. Only 

364 categories in the list will be returned. 

365 """ 

366 flashes = request_ctx.flashes 

367 if flashes is None: 

368 flashes = session.pop("_flashes") if "_flashes" in session else [] 

369 request_ctx.flashes = flashes 

370 if category_filter: 

371 flashes = list(filter(lambda f: f[0] in category_filter, flashes)) 

372 if not with_categories: 

373 return [x[1] for x in flashes] 

374 return flashes 

375 

376 

377def _prepare_send_file_kwargs(**kwargs: t.Any) -> dict[str, t.Any]: 

378 if kwargs.get("max_age") is None: 

379 kwargs["max_age"] = current_app.get_send_file_max_age 

380 

381 kwargs.update( 

382 environ=request.environ, 

383 use_x_sendfile=current_app.config["USE_X_SENDFILE"], 

384 response_class=current_app.response_class, 

385 _root_path=current_app.root_path, # type: ignore 

386 ) 

387 return kwargs 

388 

389 

390def send_file( 

391 path_or_file: os.PathLike | str | t.BinaryIO, 

392 mimetype: str | None = None, 

393 as_attachment: bool = False, 

394 download_name: str | None = None, 

395 conditional: bool = True, 

396 etag: bool | str = True, 

397 last_modified: datetime | int | float | None = None, 

398 max_age: None | (int | t.Callable[[str | None], int | None]) = None, 

399) -> Response: 

400 """Send the contents of a file to the client. 

401 

402 The first argument can be a file path or a file-like object. Paths 

403 are preferred in most cases because Werkzeug can manage the file and 

404 get extra information from the path. Passing a file-like object 

405 requires that the file is opened in binary mode, and is mostly 

406 useful when building a file in memory with :class:`io.BytesIO`. 

407 

408 Never pass file paths provided by a user. The path is assumed to be 

409 trusted, so a user could craft a path to access a file you didn't 

410 intend. Use :func:`send_from_directory` to safely serve 

411 user-requested paths from within a directory. 

412 

413 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is 

414 used, otherwise Werkzeug's built-in wrapper is used. Alternatively, 

415 if the HTTP server supports ``X-Sendfile``, configuring Flask with 

416 ``USE_X_SENDFILE = True`` will tell the server to send the given 

417 path, which is much more efficient than reading it in Python. 

418 

419 :param path_or_file: The path to the file to send, relative to the 

420 current working directory if a relative path is given. 

421 Alternatively, a file-like object opened in binary mode. Make 

422 sure the file pointer is seeked to the start of the data. 

423 :param mimetype: The MIME type to send for the file. If not 

424 provided, it will try to detect it from the file name. 

425 :param as_attachment: Indicate to a browser that it should offer to 

426 save the file instead of displaying it. 

427 :param download_name: The default name browsers will use when saving 

428 the file. Defaults to the passed file name. 

429 :param conditional: Enable conditional and range responses based on 

430 request headers. Requires passing a file path and ``environ``. 

431 :param etag: Calculate an ETag for the file, which requires passing 

432 a file path. Can also be a string to use instead. 

433 :param last_modified: The last modified time to send for the file, 

434 in seconds. If not provided, it will try to detect it from the 

435 file path. 

436 :param max_age: How long the client should cache the file, in 

437 seconds. If set, ``Cache-Control`` will be ``public``, otherwise 

438 it will be ``no-cache`` to prefer conditional caching. 

439 

440 .. versionchanged:: 2.0 

441 ``download_name`` replaces the ``attachment_filename`` 

442 parameter. If ``as_attachment=False``, it is passed with 

443 ``Content-Disposition: inline`` instead. 

444 

445 .. versionchanged:: 2.0 

446 ``max_age`` replaces the ``cache_timeout`` parameter. 

447 ``conditional`` is enabled and ``max_age`` is not set by 

448 default. 

449 

450 .. versionchanged:: 2.0 

451 ``etag`` replaces the ``add_etags`` parameter. It can be a 

452 string to use instead of generating one. 

453 

454 .. versionchanged:: 2.0 

455 Passing a file-like object that inherits from 

456 :class:`~io.TextIOBase` will raise a :exc:`ValueError` rather 

457 than sending an empty file. 

458 

459 .. versionadded:: 2.0 

460 Moved the implementation to Werkzeug. This is now a wrapper to 

461 pass some Flask-specific arguments. 

462 

463 .. versionchanged:: 1.1 

464 ``filename`` may be a :class:`~os.PathLike` object. 

465 

466 .. versionchanged:: 1.1 

467 Passing a :class:`~io.BytesIO` object supports range requests. 

468 

469 .. versionchanged:: 1.0.3 

470 Filenames are encoded with ASCII instead of Latin-1 for broader 

471 compatibility with WSGI servers. 

472 

473 .. versionchanged:: 1.0 

474 UTF-8 filenames as specified in :rfc:`2231` are supported. 

475 

476 .. versionchanged:: 0.12 

477 The filename is no longer automatically inferred from file 

478 objects. If you want to use automatic MIME and etag support, 

479 pass a filename via ``filename_or_fp`` or 

480 ``attachment_filename``. 

481 

482 .. versionchanged:: 0.12 

483 ``attachment_filename`` is preferred over ``filename`` for MIME 

484 detection. 

485 

486 .. versionchanged:: 0.9 

487 ``cache_timeout`` defaults to 

488 :meth:`Flask.get_send_file_max_age`. 

489 

490 .. versionchanged:: 0.7 

491 MIME guessing and etag support for file-like objects was 

492 removed because it was unreliable. Pass a filename if you are 

493 able to, otherwise attach an etag yourself. 

494 

495 .. versionchanged:: 0.5 

496 The ``add_etags``, ``cache_timeout`` and ``conditional`` 

497 parameters were added. The default behavior is to add etags. 

498 

499 .. versionadded:: 0.2 

500 """ 

501 return werkzeug.utils.send_file( # type: ignore[return-value] 

502 **_prepare_send_file_kwargs( 

503 path_or_file=path_or_file, 

504 environ=request.environ, 

505 mimetype=mimetype, 

506 as_attachment=as_attachment, 

507 download_name=download_name, 

508 conditional=conditional, 

509 etag=etag, 

510 last_modified=last_modified, 

511 max_age=max_age, 

512 ) 

513 ) 

514 

515 

516def send_from_directory( 

517 directory: os.PathLike | str, 

518 path: os.PathLike | str, 

519 **kwargs: t.Any, 

520) -> Response: 

521 """Send a file from within a directory using :func:`send_file`. 

522 

523 .. code-block:: python 

524 

525 @app.route("/uploads/<path:name>") 

526 def download_file(name): 

527 return send_from_directory( 

528 app.config['UPLOAD_FOLDER'], name, as_attachment=True 

529 ) 

530 

531 This is a secure way to serve files from a folder, such as static 

532 files or uploads. Uses :func:`~werkzeug.security.safe_join` to 

533 ensure the path coming from the client is not maliciously crafted to 

534 point outside the specified directory. 

535 

536 If the final path does not point to an existing regular file, 

537 raises a 404 :exc:`~werkzeug.exceptions.NotFound` error. 

538 

539 :param directory: The directory that ``path`` must be located under, 

540 relative to the current application's root path. 

541 :param path: The path to the file to send, relative to 

542 ``directory``. 

543 :param kwargs: Arguments to pass to :func:`send_file`. 

544 

545 .. versionchanged:: 2.0 

546 ``path`` replaces the ``filename`` parameter. 

547 

548 .. versionadded:: 2.0 

549 Moved the implementation to Werkzeug. This is now a wrapper to 

550 pass some Flask-specific arguments. 

551 

552 .. versionadded:: 0.5 

553 """ 

554 return werkzeug.utils.send_from_directory( # type: ignore[return-value] 

555 directory, path, **_prepare_send_file_kwargs(**kwargs) 

556 ) 

557 

558 

559def get_root_path(import_name: str) -> str: 

560 """Find the root path of a package, or the path that contains a 

561 module. If it cannot be found, returns the current working 

562 directory. 

563 

564 Not to be confused with the value returned by :func:`find_package`. 

565 

566 :meta private: 

567 """ 

568 # Module already imported and has a file attribute. Use that first. 

569 mod = sys.modules.get(import_name) 

570 

571 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

572 return os.path.dirname(os.path.abspath(mod.__file__)) 

573 

574 # Next attempt: check the loader. 

575 try: 

576 spec = importlib.util.find_spec(import_name) 

577 

578 if spec is None: 

579 raise ValueError 

580 except (ImportError, ValueError): 

581 loader = None 

582 else: 

583 loader = spec.loader 

584 

585 # Loader does not exist or we're referring to an unloaded main 

586 # module or a main module without path (interactive sessions), go 

587 # with the current working directory. 

588 if loader is None: 

589 return os.getcwd() 

590 

591 if hasattr(loader, "get_filename"): 

592 filepath = loader.get_filename(import_name) 

593 else: 

594 # Fall back to imports. 

595 __import__(import_name) 

596 mod = sys.modules[import_name] 

597 filepath = getattr(mod, "__file__", None) 

598 

599 # If we don't have a file path it might be because it is a 

600 # namespace package. In this case pick the root path from the 

601 # first module that is contained in the package. 

602 if filepath is None: 

603 raise RuntimeError( 

604 "No root path can be found for the provided module" 

605 f" {import_name!r}. This can happen because the module" 

606 " came from an import hook that does not provide file" 

607 " name information or because it's a namespace package." 

608 " In this case the root path needs to be explicitly" 

609 " provided." 

610 ) 

611 

612 # filepath is import_name.py for a module, or __init__.py for a package. 

613 return os.path.dirname(os.path.abspath(filepath)) 

614 

615 

616@lru_cache(maxsize=None) 

617def _split_blueprint_path(name: str) -> list[str]: 

618 out: list[str] = [name] 

619 

620 if "." in name: 

621 out.extend(_split_blueprint_path(name.rpartition(".")[0])) 

622 

623 return out