Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/flask/helpers.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

121 statements  

1from __future__ import annotations 

2 

3import importlib.util 

4import os 

5import sys 

6import typing as t 

7from datetime import datetime 

8from functools import cache 

9from functools import update_wrapper 

10 

11import werkzeug.utils 

12from werkzeug.exceptions import abort as _wz_abort 

13from werkzeug.utils import redirect as _wz_redirect 

14from werkzeug.wrappers import Response as BaseResponse 

15 

16from .globals import _cv_request 

17from .globals import current_app 

18from .globals import request 

19from .globals import request_ctx 

20from .globals import session 

21from .signals import message_flashed 

22 

23if t.TYPE_CHECKING: # pragma: no cover 

24 from .wrappers import Response 

25 

26 

27def get_debug_flag() -> bool: 

28 """Get whether debug mode should be enabled for the app, indicated by the 

29 :envvar:`FLASK_DEBUG` environment variable. The default is ``False``. 

30 """ 

31 val = os.environ.get("FLASK_DEBUG") 

32 return bool(val and val.lower() not in {"0", "false", "no"}) 

33 

34 

35def get_load_dotenv(default: bool = True) -> bool: 

36 """Get whether the user has disabled loading default dotenv files by 

37 setting :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load 

38 the files. 

39 

40 :param default: What to return if the env var isn't set. 

41 """ 

42 val = os.environ.get("FLASK_SKIP_DOTENV") 

43 

44 if not val: 

45 return default 

46 

47 return val.lower() in ("0", "false", "no") 

48 

49 

50@t.overload 

51def stream_with_context( 

52 generator_or_function: t.Iterator[t.AnyStr], 

53) -> t.Iterator[t.AnyStr]: ... 

54 

55 

56@t.overload 

57def stream_with_context( 

58 generator_or_function: t.Callable[..., t.Iterator[t.AnyStr]], 

59) -> t.Callable[[t.Iterator[t.AnyStr]], t.Iterator[t.AnyStr]]: ... 

60 

61 

62def stream_with_context( 

63 generator_or_function: t.Iterator[t.AnyStr] | t.Callable[..., t.Iterator[t.AnyStr]], 

64) -> t.Iterator[t.AnyStr] | t.Callable[[t.Iterator[t.AnyStr]], t.Iterator[t.AnyStr]]: 

65 """Request contexts disappear when the response is started on the server. 

66 This is done for efficiency reasons and to make it less likely to encounter 

67 memory leaks with badly written WSGI middlewares. The downside is that if 

68 you are using streamed responses, the generator cannot access request bound 

69 information any more. 

70 

71 This function however can help you keep the context around for longer:: 

72 

73 from flask import stream_with_context, request, Response 

74 

75 @app.route('/stream') 

76 def streamed_response(): 

77 @stream_with_context 

78 def generate(): 

79 yield 'Hello ' 

80 yield request.args['name'] 

81 yield '!' 

82 return Response(generate()) 

83 

84 Alternatively it can also be used around a specific generator:: 

85 

86 from flask import stream_with_context, request, Response 

87 

88 @app.route('/stream') 

89 def streamed_response(): 

90 def generate(): 

91 yield 'Hello ' 

92 yield request.args['name'] 

93 yield '!' 

94 return Response(stream_with_context(generate())) 

95 

96 .. versionadded:: 0.9 

97 """ 

98 try: 

99 gen = iter(generator_or_function) # type: ignore[arg-type] 

100 except TypeError: 

101 

102 def decorator(*args: t.Any, **kwargs: t.Any) -> t.Any: 

103 gen = generator_or_function(*args, **kwargs) # type: ignore[operator] 

104 return stream_with_context(gen) 

105 

106 return update_wrapper(decorator, generator_or_function) # type: ignore[arg-type] 

107 

108 def generator() -> t.Iterator[t.AnyStr | None]: 

109 ctx = _cv_request.get(None) 

110 if ctx is None: 

111 raise RuntimeError( 

112 "'stream_with_context' can only be used when a request" 

113 " context is active, such as in a view function." 

114 ) 

115 with ctx: 

116 # Dummy sentinel. Has to be inside the context block or we're 

117 # not actually keeping the context around. 

118 yield None 

119 

120 # The try/finally is here so that if someone passes a WSGI level 

121 # iterator in we're still running the cleanup logic. Generators 

122 # don't need that because they are closed on their destruction 

123 # automatically. 

124 try: 

125 yield from gen 

126 finally: 

127 if hasattr(gen, "close"): 

128 gen.close() 

129 

130 # The trick is to start the generator. Then the code execution runs until 

131 # the first dummy None is yielded at which point the context was already 

132 # pushed. This item is discarded. Then when the iteration continues the 

133 # real generator is executed. 

134 wrapped_g = generator() 

135 next(wrapped_g) 

136 return wrapped_g # type: ignore[return-value] 

137 

138 

139def make_response(*args: t.Any) -> Response: 

140 """Sometimes it is necessary to set additional headers in a view. Because 

141 views do not have to return response objects but can return a value that 

142 is converted into a response object by Flask itself, it becomes tricky to 

143 add headers to it. This function can be called instead of using a return 

144 and you will get a response object which you can use to attach headers. 

145 

146 If view looked like this and you want to add a new header:: 

147 

148 def index(): 

149 return render_template('index.html', foo=42) 

150 

151 You can now do something like this:: 

152 

153 def index(): 

154 response = make_response(render_template('index.html', foo=42)) 

155 response.headers['X-Parachutes'] = 'parachutes are cool' 

156 return response 

157 

158 This function accepts the very same arguments you can return from a 

159 view function. This for example creates a response with a 404 error 

160 code:: 

161 

162 response = make_response(render_template('not_found.html'), 404) 

163 

164 The other use case of this function is to force the return value of a 

165 view function into a response which is helpful with view 

166 decorators:: 

167 

168 response = make_response(view_function()) 

169 response.headers['X-Parachutes'] = 'parachutes are cool' 

170 

171 Internally this function does the following things: 

172 

173 - if no arguments are passed, it creates a new response argument 

174 - if one argument is passed, :meth:`flask.Flask.make_response` 

175 is invoked with it. 

176 - if more than one argument is passed, the arguments are passed 

177 to the :meth:`flask.Flask.make_response` function as tuple. 

178 

179 .. versionadded:: 0.6 

180 """ 

181 if not args: 

182 return current_app.response_class() 

183 if len(args) == 1: 

184 args = args[0] 

185 return current_app.make_response(args) 

186 

187 

188def url_for( 

189 endpoint: str, 

190 *, 

191 _anchor: str | None = None, 

192 _method: str | None = None, 

193 _scheme: str | None = None, 

194 _external: bool | None = None, 

195 **values: t.Any, 

196) -> str: 

197 """Generate a URL to the given endpoint with the given values. 

198 

199 This requires an active request or application context, and calls 

200 :meth:`current_app.url_for() <flask.Flask.url_for>`. See that method 

201 for full documentation. 

202 

203 :param endpoint: The endpoint name associated with the URL to 

204 generate. If this starts with a ``.``, the current blueprint 

205 name (if any) will be used. 

206 :param _anchor: If given, append this as ``#anchor`` to the URL. 

207 :param _method: If given, generate the URL associated with this 

208 method for the endpoint. 

209 :param _scheme: If given, the URL will have this scheme if it is 

210 external. 

211 :param _external: If given, prefer the URL to be internal (False) or 

212 require it to be external (True). External URLs include the 

213 scheme and domain. When not in an active request, URLs are 

214 external by default. 

215 :param values: Values to use for the variable parts of the URL rule. 

216 Unknown keys are appended as query string arguments, like 

217 ``?a=b&c=d``. 

218 

219 .. versionchanged:: 2.2 

220 Calls ``current_app.url_for``, allowing an app to override the 

221 behavior. 

222 

223 .. versionchanged:: 0.10 

224 The ``_scheme`` parameter was added. 

225 

226 .. versionchanged:: 0.9 

227 The ``_anchor`` and ``_method`` parameters were added. 

228 

229 .. versionchanged:: 0.9 

230 Calls ``app.handle_url_build_error`` on build errors. 

231 """ 

232 return current_app.url_for( 

233 endpoint, 

234 _anchor=_anchor, 

235 _method=_method, 

236 _scheme=_scheme, 

237 _external=_external, 

238 **values, 

239 ) 

240 

241 

242def redirect( 

243 location: str, code: int = 302, Response: type[BaseResponse] | None = None 

244) -> BaseResponse: 

245 """Create a redirect response object. 

246 

247 If :data:`~flask.current_app` is available, it will use its 

248 :meth:`~flask.Flask.redirect` method, otherwise it will use 

249 :func:`werkzeug.utils.redirect`. 

250 

251 :param location: The URL to redirect to. 

252 :param code: The status code for the redirect. 

253 :param Response: The response class to use. Not used when 

254 ``current_app`` is active, which uses ``app.response_class``. 

255 

256 .. versionadded:: 2.2 

257 Calls ``current_app.redirect`` if available instead of always 

258 using Werkzeug's default ``redirect``. 

259 """ 

260 if current_app: 

261 return current_app.redirect(location, code=code) 

262 

263 return _wz_redirect(location, code=code, Response=Response) 

264 

265 

266def abort(code: int | BaseResponse, *args: t.Any, **kwargs: t.Any) -> t.NoReturn: 

267 """Raise an :exc:`~werkzeug.exceptions.HTTPException` for the given 

268 status code. 

269 

270 If :data:`~flask.current_app` is available, it will call its 

271 :attr:`~flask.Flask.aborter` object, otherwise it will use 

272 :func:`werkzeug.exceptions.abort`. 

273 

274 :param code: The status code for the exception, which must be 

275 registered in ``app.aborter``. 

276 :param args: Passed to the exception. 

277 :param kwargs: Passed to the exception. 

278 

279 .. versionadded:: 2.2 

280 Calls ``current_app.aborter`` if available instead of always 

281 using Werkzeug's default ``abort``. 

282 """ 

283 if current_app: 

284 current_app.aborter(code, *args, **kwargs) 

285 

286 _wz_abort(code, *args, **kwargs) 

287 

288 

289def get_template_attribute(template_name: str, attribute: str) -> t.Any: 

290 """Loads a macro (or variable) a template exports. This can be used to 

291 invoke a macro from within Python code. If you for example have a 

292 template named :file:`_cider.html` with the following contents: 

293 

294 .. sourcecode:: html+jinja 

295 

296 {% macro hello(name) %}Hello {{ name }}!{% endmacro %} 

297 

298 You can access this from Python code like this:: 

299 

300 hello = get_template_attribute('_cider.html', 'hello') 

301 return hello('World') 

302 

303 .. versionadded:: 0.2 

304 

305 :param template_name: the name of the template 

306 :param attribute: the name of the variable of macro to access 

307 """ 

308 return getattr(current_app.jinja_env.get_template(template_name).module, attribute) 

309 

310 

311def flash(message: str, category: str = "message") -> None: 

312 """Flashes a message to the next request. In order to remove the 

313 flashed message from the session and to display it to the user, 

314 the template has to call :func:`get_flashed_messages`. 

315 

316 .. versionchanged:: 0.3 

317 `category` parameter added. 

318 

319 :param message: the message to be flashed. 

320 :param category: the category for the message. The following values 

321 are recommended: ``'message'`` for any kind of message, 

322 ``'error'`` for errors, ``'info'`` for information 

323 messages and ``'warning'`` for warnings. However any 

324 kind of string can be used as category. 

325 """ 

326 # Original implementation: 

327 # 

328 # session.setdefault('_flashes', []).append((category, message)) 

329 # 

330 # This assumed that changes made to mutable structures in the session are 

331 # always in sync with the session object, which is not true for session 

332 # implementations that use external storage for keeping their keys/values. 

333 flashes = session.get("_flashes", []) 

334 flashes.append((category, message)) 

335 session["_flashes"] = flashes 

336 app = current_app._get_current_object() # type: ignore 

337 message_flashed.send( 

338 app, 

339 _async_wrapper=app.ensure_sync, 

340 message=message, 

341 category=category, 

342 ) 

343 

344 

345def get_flashed_messages( 

346 with_categories: bool = False, category_filter: t.Iterable[str] = () 

347) -> list[str] | list[tuple[str, str]]: 

348 """Pulls all flashed messages from the session and returns them. 

349 Further calls in the same request to the function will return 

350 the same messages. By default just the messages are returned, 

351 but when `with_categories` is set to ``True``, the return value will 

352 be a list of tuples in the form ``(category, message)`` instead. 

353 

354 Filter the flashed messages to one or more categories by providing those 

355 categories in `category_filter`. This allows rendering categories in 

356 separate html blocks. The `with_categories` and `category_filter` 

357 arguments are distinct: 

358 

359 * `with_categories` controls whether categories are returned with message 

360 text (``True`` gives a tuple, where ``False`` gives just the message text). 

361 * `category_filter` filters the messages down to only those matching the 

362 provided categories. 

363 

364 See :doc:`/patterns/flashing` for examples. 

365 

366 .. versionchanged:: 0.3 

367 `with_categories` parameter added. 

368 

369 .. versionchanged:: 0.9 

370 `category_filter` parameter added. 

371 

372 :param with_categories: set to ``True`` to also receive categories. 

373 :param category_filter: filter of categories to limit return values. Only 

374 categories in the list will be returned. 

375 """ 

376 flashes = request_ctx.flashes 

377 if flashes is None: 

378 flashes = session.pop("_flashes") if "_flashes" in session else [] 

379 request_ctx.flashes = flashes 

380 if category_filter: 

381 flashes = list(filter(lambda f: f[0] in category_filter, flashes)) 

382 if not with_categories: 

383 return [x[1] for x in flashes] 

384 return flashes 

385 

386 

387def _prepare_send_file_kwargs(**kwargs: t.Any) -> dict[str, t.Any]: 

388 if kwargs.get("max_age") is None: 

389 kwargs["max_age"] = current_app.get_send_file_max_age 

390 

391 kwargs.update( 

392 environ=request.environ, 

393 use_x_sendfile=current_app.config["USE_X_SENDFILE"], 

394 response_class=current_app.response_class, 

395 _root_path=current_app.root_path, # type: ignore 

396 ) 

397 return kwargs 

398 

399 

400def send_file( 

401 path_or_file: os.PathLike[t.AnyStr] | str | t.BinaryIO, 

402 mimetype: str | None = None, 

403 as_attachment: bool = False, 

404 download_name: str | None = None, 

405 conditional: bool = True, 

406 etag: bool | str = True, 

407 last_modified: datetime | int | float | None = None, 

408 max_age: None | (int | t.Callable[[str | None], int | None]) = None, 

409) -> Response: 

410 """Send the contents of a file to the client. 

411 

412 The first argument can be a file path or a file-like object. Paths 

413 are preferred in most cases because Werkzeug can manage the file and 

414 get extra information from the path. Passing a file-like object 

415 requires that the file is opened in binary mode, and is mostly 

416 useful when building a file in memory with :class:`io.BytesIO`. 

417 

418 Never pass file paths provided by a user. The path is assumed to be 

419 trusted, so a user could craft a path to access a file you didn't 

420 intend. Use :func:`send_from_directory` to safely serve 

421 user-requested paths from within a directory. 

422 

423 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is 

424 used, otherwise Werkzeug's built-in wrapper is used. Alternatively, 

425 if the HTTP server supports ``X-Sendfile``, configuring Flask with 

426 ``USE_X_SENDFILE = True`` will tell the server to send the given 

427 path, which is much more efficient than reading it in Python. 

428 

429 :param path_or_file: The path to the file to send, relative to the 

430 current working directory if a relative path is given. 

431 Alternatively, a file-like object opened in binary mode. Make 

432 sure the file pointer is seeked to the start of the data. 

433 :param mimetype: The MIME type to send for the file. If not 

434 provided, it will try to detect it from the file name. 

435 :param as_attachment: Indicate to a browser that it should offer to 

436 save the file instead of displaying it. 

437 :param download_name: The default name browsers will use when saving 

438 the file. Defaults to the passed file name. 

439 :param conditional: Enable conditional and range responses based on 

440 request headers. Requires passing a file path and ``environ``. 

441 :param etag: Calculate an ETag for the file, which requires passing 

442 a file path. Can also be a string to use instead. 

443 :param last_modified: The last modified time to send for the file, 

444 in seconds. If not provided, it will try to detect it from the 

445 file path. 

446 :param max_age: How long the client should cache the file, in 

447 seconds. If set, ``Cache-Control`` will be ``public``, otherwise 

448 it will be ``no-cache`` to prefer conditional caching. 

449 

450 .. versionchanged:: 2.0 

451 ``download_name`` replaces the ``attachment_filename`` 

452 parameter. If ``as_attachment=False``, it is passed with 

453 ``Content-Disposition: inline`` instead. 

454 

455 .. versionchanged:: 2.0 

456 ``max_age`` replaces the ``cache_timeout`` parameter. 

457 ``conditional`` is enabled and ``max_age`` is not set by 

458 default. 

459 

460 .. versionchanged:: 2.0 

461 ``etag`` replaces the ``add_etags`` parameter. It can be a 

462 string to use instead of generating one. 

463 

464 .. versionchanged:: 2.0 

465 Passing a file-like object that inherits from 

466 :class:`~io.TextIOBase` will raise a :exc:`ValueError` rather 

467 than sending an empty file. 

468 

469 .. versionadded:: 2.0 

470 Moved the implementation to Werkzeug. This is now a wrapper to 

471 pass some Flask-specific arguments. 

472 

473 .. versionchanged:: 1.1 

474 ``filename`` may be a :class:`~os.PathLike` object. 

475 

476 .. versionchanged:: 1.1 

477 Passing a :class:`~io.BytesIO` object supports range requests. 

478 

479 .. versionchanged:: 1.0.3 

480 Filenames are encoded with ASCII instead of Latin-1 for broader 

481 compatibility with WSGI servers. 

482 

483 .. versionchanged:: 1.0 

484 UTF-8 filenames as specified in :rfc:`2231` are supported. 

485 

486 .. versionchanged:: 0.12 

487 The filename is no longer automatically inferred from file 

488 objects. If you want to use automatic MIME and etag support, 

489 pass a filename via ``filename_or_fp`` or 

490 ``attachment_filename``. 

491 

492 .. versionchanged:: 0.12 

493 ``attachment_filename`` is preferred over ``filename`` for MIME 

494 detection. 

495 

496 .. versionchanged:: 0.9 

497 ``cache_timeout`` defaults to 

498 :meth:`Flask.get_send_file_max_age`. 

499 

500 .. versionchanged:: 0.7 

501 MIME guessing and etag support for file-like objects was 

502 removed because it was unreliable. Pass a filename if you are 

503 able to, otherwise attach an etag yourself. 

504 

505 .. versionchanged:: 0.5 

506 The ``add_etags``, ``cache_timeout`` and ``conditional`` 

507 parameters were added. The default behavior is to add etags. 

508 

509 .. versionadded:: 0.2 

510 """ 

511 return werkzeug.utils.send_file( # type: ignore[return-value] 

512 **_prepare_send_file_kwargs( 

513 path_or_file=path_or_file, 

514 environ=request.environ, 

515 mimetype=mimetype, 

516 as_attachment=as_attachment, 

517 download_name=download_name, 

518 conditional=conditional, 

519 etag=etag, 

520 last_modified=last_modified, 

521 max_age=max_age, 

522 ) 

523 ) 

524 

525 

526def send_from_directory( 

527 directory: os.PathLike[str] | str, 

528 path: os.PathLike[str] | str, 

529 **kwargs: t.Any, 

530) -> Response: 

531 """Send a file from within a directory using :func:`send_file`. 

532 

533 .. code-block:: python 

534 

535 @app.route("/uploads/<path:name>") 

536 def download_file(name): 

537 return send_from_directory( 

538 app.config['UPLOAD_FOLDER'], name, as_attachment=True 

539 ) 

540 

541 This is a secure way to serve files from a folder, such as static 

542 files or uploads. Uses :func:`~werkzeug.security.safe_join` to 

543 ensure the path coming from the client is not maliciously crafted to 

544 point outside the specified directory. 

545 

546 If the final path does not point to an existing regular file, 

547 raises a 404 :exc:`~werkzeug.exceptions.NotFound` error. 

548 

549 :param directory: The directory that ``path`` must be located under, 

550 relative to the current application's root path. This *must not* 

551 be a value provided by the client, otherwise it becomes insecure. 

552 :param path: The path to the file to send, relative to 

553 ``directory``. 

554 :param kwargs: Arguments to pass to :func:`send_file`. 

555 

556 .. versionchanged:: 2.0 

557 ``path`` replaces the ``filename`` parameter. 

558 

559 .. versionadded:: 2.0 

560 Moved the implementation to Werkzeug. This is now a wrapper to 

561 pass some Flask-specific arguments. 

562 

563 .. versionadded:: 0.5 

564 """ 

565 return werkzeug.utils.send_from_directory( # type: ignore[return-value] 

566 directory, path, **_prepare_send_file_kwargs(**kwargs) 

567 ) 

568 

569 

570def get_root_path(import_name: str) -> str: 

571 """Find the root path of a package, or the path that contains a 

572 module. If it cannot be found, returns the current working 

573 directory. 

574 

575 Not to be confused with the value returned by :func:`find_package`. 

576 

577 :meta private: 

578 """ 

579 # Module already imported and has a file attribute. Use that first. 

580 mod = sys.modules.get(import_name) 

581 

582 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

583 return os.path.dirname(os.path.abspath(mod.__file__)) 

584 

585 # Next attempt: check the loader. 

586 try: 

587 spec = importlib.util.find_spec(import_name) 

588 

589 if spec is None: 

590 raise ValueError 

591 except (ImportError, ValueError): 

592 loader = None 

593 else: 

594 loader = spec.loader 

595 

596 # Loader does not exist or we're referring to an unloaded main 

597 # module or a main module without path (interactive sessions), go 

598 # with the current working directory. 

599 if loader is None: 

600 return os.getcwd() 

601 

602 if hasattr(loader, "get_filename"): 

603 filepath = loader.get_filename(import_name) # pyright: ignore 

604 else: 

605 # Fall back to imports. 

606 __import__(import_name) 

607 mod = sys.modules[import_name] 

608 filepath = getattr(mod, "__file__", None) 

609 

610 # If we don't have a file path it might be because it is a 

611 # namespace package. In this case pick the root path from the 

612 # first module that is contained in the package. 

613 if filepath is None: 

614 raise RuntimeError( 

615 "No root path can be found for the provided module" 

616 f" {import_name!r}. This can happen because the module" 

617 " came from an import hook that does not provide file" 

618 " name information or because it's a namespace package." 

619 " In this case the root path needs to be explicitly" 

620 " provided." 

621 ) 

622 

623 # filepath is import_name.py for a module, or __init__.py for a package. 

624 return os.path.dirname(os.path.abspath(filepath)) # type: ignore[no-any-return] 

625 

626 

627@cache 

628def _split_blueprint_path(name: str) -> list[str]: 

629 out: list[str] = [name] 

630 

631 if "." in name: 

632 out.extend(_split_blueprint_path(name.rpartition(".")[0])) 

633 

634 return out