Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/flask/helpers.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1from __future__ import annotations 

2 

3import importlib.util 

4import os 

5import sys 

6import typing as t 

7from datetime import datetime 

8from functools import cache 

9from functools import update_wrapper 

10 

11import werkzeug.utils 

12from werkzeug.exceptions import abort as _wz_abort 

13from werkzeug.utils import redirect as _wz_redirect 

14from werkzeug.wrappers import Response as BaseResponse 

15 

16from .globals import _cv_app 

17from .globals import _cv_request 

18from .globals import current_app 

19from .globals import request 

20from .globals import request_ctx 

21from .globals import session 

22from .signals import message_flashed 

23 

24if t.TYPE_CHECKING: # pragma: no cover 

25 from .wrappers import Response 

26 

27 

28def get_debug_flag() -> bool: 

29 """Get whether debug mode should be enabled for the app, indicated by the 

30 :envvar:`FLASK_DEBUG` environment variable. The default is ``False``. 

31 """ 

32 val = os.environ.get("FLASK_DEBUG") 

33 return bool(val and val.lower() not in {"0", "false", "no"}) 

34 

35 

36def get_load_dotenv(default: bool = True) -> bool: 

37 """Get whether the user has disabled loading default dotenv files by 

38 setting :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load 

39 the files. 

40 

41 :param default: What to return if the env var isn't set. 

42 """ 

43 val = os.environ.get("FLASK_SKIP_DOTENV") 

44 

45 if not val: 

46 return default 

47 

48 return val.lower() in ("0", "false", "no") 

49 

50 

51@t.overload 

52def stream_with_context( 

53 generator_or_function: t.Iterator[t.AnyStr], 

54) -> t.Iterator[t.AnyStr]: ... 

55 

56 

57@t.overload 

58def stream_with_context( 

59 generator_or_function: t.Callable[..., t.Iterator[t.AnyStr]], 

60) -> t.Callable[[t.Iterator[t.AnyStr]], t.Iterator[t.AnyStr]]: ... 

61 

62 

63def stream_with_context( 

64 generator_or_function: t.Iterator[t.AnyStr] | t.Callable[..., t.Iterator[t.AnyStr]], 

65) -> t.Iterator[t.AnyStr] | t.Callable[[t.Iterator[t.AnyStr]], t.Iterator[t.AnyStr]]: 

66 """Wrap a response generator function so that it runs inside the current 

67 request context. This keeps :data:`request`, :data:`session`, and :data:`g` 

68 available, even though at the point the generator runs the request context 

69 will typically have ended. 

70 

71 Use it as a decorator on a generator function: 

72 

73 .. code-block:: python 

74 

75 from flask import stream_with_context, request, Response 

76 

77 @app.get("/stream") 

78 def streamed_response(): 

79 @stream_with_context 

80 def generate(): 

81 yield "Hello " 

82 yield request.args["name"] 

83 yield "!" 

84 

85 return Response(generate()) 

86 

87 Or use it as a wrapper around a created generator: 

88 

89 .. code-block:: python 

90 

91 from flask import stream_with_context, request, Response 

92 

93 @app.get("/stream") 

94 def streamed_response(): 

95 def generate(): 

96 yield "Hello " 

97 yield request.args["name"] 

98 yield "!" 

99 

100 return Response(stream_with_context(generate())) 

101 

102 .. versionadded:: 0.9 

103 """ 

104 try: 

105 gen = iter(generator_or_function) # type: ignore[arg-type] 

106 except TypeError: 

107 

108 def decorator(*args: t.Any, **kwargs: t.Any) -> t.Any: 

109 gen = generator_or_function(*args, **kwargs) # type: ignore[operator] 

110 return stream_with_context(gen) 

111 

112 return update_wrapper(decorator, generator_or_function) # type: ignore[arg-type] 

113 

114 def generator() -> t.Iterator[t.AnyStr]: 

115 if (req_ctx := _cv_request.get(None)) is None: 

116 raise RuntimeError( 

117 "'stream_with_context' can only be used when a request" 

118 " context is active, such as in a view function." 

119 ) 

120 

121 app_ctx = _cv_app.get() 

122 # Setup code below will run the generator to this point, so that the 

123 # current contexts are recorded. The contexts must be pushed after, 

124 # otherwise their ContextVar will record the wrong event loop during 

125 # async view functions. 

126 yield None # type: ignore[misc] 

127 

128 # Push the app context first, so that the request context does not 

129 # automatically create and push a different app context. 

130 with app_ctx, req_ctx: 

131 try: 

132 yield from gen 

133 finally: 

134 # Clean up in case the user wrapped a WSGI iterator. 

135 if hasattr(gen, "close"): 

136 gen.close() 

137 

138 # Execute the generator to the sentinel value. This ensures the context is 

139 # preserved in the generator's state. Further iteration will push the 

140 # context and yield from the original iterator. 

141 wrapped_g = generator() 

142 next(wrapped_g) 

143 return wrapped_g 

144 

145 

146def make_response(*args: t.Any) -> Response: 

147 """Sometimes it is necessary to set additional headers in a view. Because 

148 views do not have to return response objects but can return a value that 

149 is converted into a response object by Flask itself, it becomes tricky to 

150 add headers to it. This function can be called instead of using a return 

151 and you will get a response object which you can use to attach headers. 

152 

153 If view looked like this and you want to add a new header:: 

154 

155 def index(): 

156 return render_template('index.html', foo=42) 

157 

158 You can now do something like this:: 

159 

160 def index(): 

161 response = make_response(render_template('index.html', foo=42)) 

162 response.headers['X-Parachutes'] = 'parachutes are cool' 

163 return response 

164 

165 This function accepts the very same arguments you can return from a 

166 view function. This for example creates a response with a 404 error 

167 code:: 

168 

169 response = make_response(render_template('not_found.html'), 404) 

170 

171 The other use case of this function is to force the return value of a 

172 view function into a response which is helpful with view 

173 decorators:: 

174 

175 response = make_response(view_function()) 

176 response.headers['X-Parachutes'] = 'parachutes are cool' 

177 

178 Internally this function does the following things: 

179 

180 - if no arguments are passed, it creates a new response argument 

181 - if one argument is passed, :meth:`flask.Flask.make_response` 

182 is invoked with it. 

183 - if more than one argument is passed, the arguments are passed 

184 to the :meth:`flask.Flask.make_response` function as tuple. 

185 

186 .. versionadded:: 0.6 

187 """ 

188 if not args: 

189 return current_app.response_class() 

190 if len(args) == 1: 

191 args = args[0] 

192 return current_app.make_response(args) 

193 

194 

195def url_for( 

196 endpoint: str, 

197 *, 

198 _anchor: str | None = None, 

199 _method: str | None = None, 

200 _scheme: str | None = None, 

201 _external: bool | None = None, 

202 **values: t.Any, 

203) -> str: 

204 """Generate a URL to the given endpoint with the given values. 

205 

206 This requires an active request or application context, and calls 

207 :meth:`current_app.url_for() <flask.Flask.url_for>`. See that method 

208 for full documentation. 

209 

210 :param endpoint: The endpoint name associated with the URL to 

211 generate. If this starts with a ``.``, the current blueprint 

212 name (if any) will be used. 

213 :param _anchor: If given, append this as ``#anchor`` to the URL. 

214 :param _method: If given, generate the URL associated with this 

215 method for the endpoint. 

216 :param _scheme: If given, the URL will have this scheme if it is 

217 external. 

218 :param _external: If given, prefer the URL to be internal (False) or 

219 require it to be external (True). External URLs include the 

220 scheme and domain. When not in an active request, URLs are 

221 external by default. 

222 :param values: Values to use for the variable parts of the URL rule. 

223 Unknown keys are appended as query string arguments, like 

224 ``?a=b&c=d``. 

225 

226 .. versionchanged:: 2.2 

227 Calls ``current_app.url_for``, allowing an app to override the 

228 behavior. 

229 

230 .. versionchanged:: 0.10 

231 The ``_scheme`` parameter was added. 

232 

233 .. versionchanged:: 0.9 

234 The ``_anchor`` and ``_method`` parameters were added. 

235 

236 .. versionchanged:: 0.9 

237 Calls ``app.handle_url_build_error`` on build errors. 

238 """ 

239 return current_app.url_for( 

240 endpoint, 

241 _anchor=_anchor, 

242 _method=_method, 

243 _scheme=_scheme, 

244 _external=_external, 

245 **values, 

246 ) 

247 

248 

249def redirect( 

250 location: str, code: int = 302, Response: type[BaseResponse] | None = None 

251) -> BaseResponse: 

252 """Create a redirect response object. 

253 

254 If :data:`~flask.current_app` is available, it will use its 

255 :meth:`~flask.Flask.redirect` method, otherwise it will use 

256 :func:`werkzeug.utils.redirect`. 

257 

258 :param location: The URL to redirect to. 

259 :param code: The status code for the redirect. 

260 :param Response: The response class to use. Not used when 

261 ``current_app`` is active, which uses ``app.response_class``. 

262 

263 .. versionadded:: 2.2 

264 Calls ``current_app.redirect`` if available instead of always 

265 using Werkzeug's default ``redirect``. 

266 """ 

267 if current_app: 

268 return current_app.redirect(location, code=code) 

269 

270 return _wz_redirect(location, code=code, Response=Response) 

271 

272 

273def abort(code: int | BaseResponse, *args: t.Any, **kwargs: t.Any) -> t.NoReturn: 

274 """Raise an :exc:`~werkzeug.exceptions.HTTPException` for the given 

275 status code. 

276 

277 If :data:`~flask.current_app` is available, it will call its 

278 :attr:`~flask.Flask.aborter` object, otherwise it will use 

279 :func:`werkzeug.exceptions.abort`. 

280 

281 :param code: The status code for the exception, which must be 

282 registered in ``app.aborter``. 

283 :param args: Passed to the exception. 

284 :param kwargs: Passed to the exception. 

285 

286 .. versionadded:: 2.2 

287 Calls ``current_app.aborter`` if available instead of always 

288 using Werkzeug's default ``abort``. 

289 """ 

290 if current_app: 

291 current_app.aborter(code, *args, **kwargs) 

292 

293 _wz_abort(code, *args, **kwargs) 

294 

295 

296def get_template_attribute(template_name: str, attribute: str) -> t.Any: 

297 """Loads a macro (or variable) a template exports. This can be used to 

298 invoke a macro from within Python code. If you for example have a 

299 template named :file:`_cider.html` with the following contents: 

300 

301 .. sourcecode:: html+jinja 

302 

303 {% macro hello(name) %}Hello {{ name }}!{% endmacro %} 

304 

305 You can access this from Python code like this:: 

306 

307 hello = get_template_attribute('_cider.html', 'hello') 

308 return hello('World') 

309 

310 .. versionadded:: 0.2 

311 

312 :param template_name: the name of the template 

313 :param attribute: the name of the variable of macro to access 

314 """ 

315 return getattr(current_app.jinja_env.get_template(template_name).module, attribute) 

316 

317 

318def flash(message: str, category: str = "message") -> None: 

319 """Flashes a message to the next request. In order to remove the 

320 flashed message from the session and to display it to the user, 

321 the template has to call :func:`get_flashed_messages`. 

322 

323 .. versionchanged:: 0.3 

324 `category` parameter added. 

325 

326 :param message: the message to be flashed. 

327 :param category: the category for the message. The following values 

328 are recommended: ``'message'`` for any kind of message, 

329 ``'error'`` for errors, ``'info'`` for information 

330 messages and ``'warning'`` for warnings. However any 

331 kind of string can be used as category. 

332 """ 

333 # Original implementation: 

334 # 

335 # session.setdefault('_flashes', []).append((category, message)) 

336 # 

337 # This assumed that changes made to mutable structures in the session are 

338 # always in sync with the session object, which is not true for session 

339 # implementations that use external storage for keeping their keys/values. 

340 flashes = session.get("_flashes", []) 

341 flashes.append((category, message)) 

342 session["_flashes"] = flashes 

343 app = current_app._get_current_object() # type: ignore 

344 message_flashed.send( 

345 app, 

346 _async_wrapper=app.ensure_sync, 

347 message=message, 

348 category=category, 

349 ) 

350 

351 

352def get_flashed_messages( 

353 with_categories: bool = False, category_filter: t.Iterable[str] = () 

354) -> list[str] | list[tuple[str, str]]: 

355 """Pulls all flashed messages from the session and returns them. 

356 Further calls in the same request to the function will return 

357 the same messages. By default just the messages are returned, 

358 but when `with_categories` is set to ``True``, the return value will 

359 be a list of tuples in the form ``(category, message)`` instead. 

360 

361 Filter the flashed messages to one or more categories by providing those 

362 categories in `category_filter`. This allows rendering categories in 

363 separate html blocks. The `with_categories` and `category_filter` 

364 arguments are distinct: 

365 

366 * `with_categories` controls whether categories are returned with message 

367 text (``True`` gives a tuple, where ``False`` gives just the message text). 

368 * `category_filter` filters the messages down to only those matching the 

369 provided categories. 

370 

371 See :doc:`/patterns/flashing` for examples. 

372 

373 .. versionchanged:: 0.3 

374 `with_categories` parameter added. 

375 

376 .. versionchanged:: 0.9 

377 `category_filter` parameter added. 

378 

379 :param with_categories: set to ``True`` to also receive categories. 

380 :param category_filter: filter of categories to limit return values. Only 

381 categories in the list will be returned. 

382 """ 

383 flashes = request_ctx.flashes 

384 if flashes is None: 

385 flashes = session.pop("_flashes") if "_flashes" in session else [] 

386 request_ctx.flashes = flashes 

387 if category_filter: 

388 flashes = list(filter(lambda f: f[0] in category_filter, flashes)) 

389 if not with_categories: 

390 return [x[1] for x in flashes] 

391 return flashes 

392 

393 

394def _prepare_send_file_kwargs(**kwargs: t.Any) -> dict[str, t.Any]: 

395 if kwargs.get("max_age") is None: 

396 kwargs["max_age"] = current_app.get_send_file_max_age 

397 

398 kwargs.update( 

399 environ=request.environ, 

400 use_x_sendfile=current_app.config["USE_X_SENDFILE"], 

401 response_class=current_app.response_class, 

402 _root_path=current_app.root_path, 

403 ) 

404 return kwargs 

405 

406 

407def send_file( 

408 path_or_file: os.PathLike[t.AnyStr] | str | t.IO[bytes], 

409 mimetype: str | None = None, 

410 as_attachment: bool = False, 

411 download_name: str | None = None, 

412 conditional: bool = True, 

413 etag: bool | str = True, 

414 last_modified: datetime | int | float | None = None, 

415 max_age: None | (int | t.Callable[[str | None], int | None]) = None, 

416) -> Response: 

417 """Send the contents of a file to the client. 

418 

419 The first argument can be a file path or a file-like object. Paths 

420 are preferred in most cases because Werkzeug can manage the file and 

421 get extra information from the path. Passing a file-like object 

422 requires that the file is opened in binary mode, and is mostly 

423 useful when building a file in memory with :class:`io.BytesIO`. 

424 

425 Never pass file paths provided by a user. The path is assumed to be 

426 trusted, so a user could craft a path to access a file you didn't 

427 intend. Use :func:`send_from_directory` to safely serve 

428 user-requested paths from within a directory. 

429 

430 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is 

431 used, otherwise Werkzeug's built-in wrapper is used. Alternatively, 

432 if the HTTP server supports ``X-Sendfile``, configuring Flask with 

433 ``USE_X_SENDFILE = True`` will tell the server to send the given 

434 path, which is much more efficient than reading it in Python. 

435 

436 :param path_or_file: The path to the file to send, relative to the 

437 current working directory if a relative path is given. 

438 Alternatively, a file-like object opened in binary mode. Make 

439 sure the file pointer is seeked to the start of the data. 

440 :param mimetype: The MIME type to send for the file. If not 

441 provided, it will try to detect it from the file name. 

442 :param as_attachment: Indicate to a browser that it should offer to 

443 save the file instead of displaying it. 

444 :param download_name: The default name browsers will use when saving 

445 the file. Defaults to the passed file name. 

446 :param conditional: Enable conditional and range responses based on 

447 request headers. Requires passing a file path and ``environ``. 

448 :param etag: Calculate an ETag for the file, which requires passing 

449 a file path. Can also be a string to use instead. 

450 :param last_modified: The last modified time to send for the file, 

451 in seconds. If not provided, it will try to detect it from the 

452 file path. 

453 :param max_age: How long the client should cache the file, in 

454 seconds. If set, ``Cache-Control`` will be ``public``, otherwise 

455 it will be ``no-cache`` to prefer conditional caching. 

456 

457 .. versionchanged:: 2.0 

458 ``download_name`` replaces the ``attachment_filename`` 

459 parameter. If ``as_attachment=False``, it is passed with 

460 ``Content-Disposition: inline`` instead. 

461 

462 .. versionchanged:: 2.0 

463 ``max_age`` replaces the ``cache_timeout`` parameter. 

464 ``conditional`` is enabled and ``max_age`` is not set by 

465 default. 

466 

467 .. versionchanged:: 2.0 

468 ``etag`` replaces the ``add_etags`` parameter. It can be a 

469 string to use instead of generating one. 

470 

471 .. versionchanged:: 2.0 

472 Passing a file-like object that inherits from 

473 :class:`~io.TextIOBase` will raise a :exc:`ValueError` rather 

474 than sending an empty file. 

475 

476 .. versionadded:: 2.0 

477 Moved the implementation to Werkzeug. This is now a wrapper to 

478 pass some Flask-specific arguments. 

479 

480 .. versionchanged:: 1.1 

481 ``filename`` may be a :class:`~os.PathLike` object. 

482 

483 .. versionchanged:: 1.1 

484 Passing a :class:`~io.BytesIO` object supports range requests. 

485 

486 .. versionchanged:: 1.0.3 

487 Filenames are encoded with ASCII instead of Latin-1 for broader 

488 compatibility with WSGI servers. 

489 

490 .. versionchanged:: 1.0 

491 UTF-8 filenames as specified in :rfc:`2231` are supported. 

492 

493 .. versionchanged:: 0.12 

494 The filename is no longer automatically inferred from file 

495 objects. If you want to use automatic MIME and etag support, 

496 pass a filename via ``filename_or_fp`` or 

497 ``attachment_filename``. 

498 

499 .. versionchanged:: 0.12 

500 ``attachment_filename`` is preferred over ``filename`` for MIME 

501 detection. 

502 

503 .. versionchanged:: 0.9 

504 ``cache_timeout`` defaults to 

505 :meth:`Flask.get_send_file_max_age`. 

506 

507 .. versionchanged:: 0.7 

508 MIME guessing and etag support for file-like objects was 

509 removed because it was unreliable. Pass a filename if you are 

510 able to, otherwise attach an etag yourself. 

511 

512 .. versionchanged:: 0.5 

513 The ``add_etags``, ``cache_timeout`` and ``conditional`` 

514 parameters were added. The default behavior is to add etags. 

515 

516 .. versionadded:: 0.2 

517 """ 

518 return werkzeug.utils.send_file( # type: ignore[return-value] 

519 **_prepare_send_file_kwargs( 

520 path_or_file=path_or_file, 

521 environ=request.environ, 

522 mimetype=mimetype, 

523 as_attachment=as_attachment, 

524 download_name=download_name, 

525 conditional=conditional, 

526 etag=etag, 

527 last_modified=last_modified, 

528 max_age=max_age, 

529 ) 

530 ) 

531 

532 

533def send_from_directory( 

534 directory: os.PathLike[str] | str, 

535 path: os.PathLike[str] | str, 

536 **kwargs: t.Any, 

537) -> Response: 

538 """Send a file from within a directory using :func:`send_file`. 

539 

540 .. code-block:: python 

541 

542 @app.route("/uploads/<path:name>") 

543 def download_file(name): 

544 return send_from_directory( 

545 app.config['UPLOAD_FOLDER'], name, as_attachment=True 

546 ) 

547 

548 This is a secure way to serve files from a folder, such as static 

549 files or uploads. Uses :func:`~werkzeug.security.safe_join` to 

550 ensure the path coming from the client is not maliciously crafted to 

551 point outside the specified directory. 

552 

553 If the final path does not point to an existing regular file, 

554 raises a 404 :exc:`~werkzeug.exceptions.NotFound` error. 

555 

556 :param directory: The directory that ``path`` must be located under, 

557 relative to the current application's root path. This *must not* 

558 be a value provided by the client, otherwise it becomes insecure. 

559 :param path: The path to the file to send, relative to 

560 ``directory``. 

561 :param kwargs: Arguments to pass to :func:`send_file`. 

562 

563 .. versionchanged:: 2.0 

564 ``path`` replaces the ``filename`` parameter. 

565 

566 .. versionadded:: 2.0 

567 Moved the implementation to Werkzeug. This is now a wrapper to 

568 pass some Flask-specific arguments. 

569 

570 .. versionadded:: 0.5 

571 """ 

572 return werkzeug.utils.send_from_directory( # type: ignore[return-value] 

573 directory, path, **_prepare_send_file_kwargs(**kwargs) 

574 ) 

575 

576 

577def get_root_path(import_name: str) -> str: 

578 """Find the root path of a package, or the path that contains a 

579 module. If it cannot be found, returns the current working 

580 directory. 

581 

582 Not to be confused with the value returned by :func:`find_package`. 

583 

584 :meta private: 

585 """ 

586 # Module already imported and has a file attribute. Use that first. 

587 mod = sys.modules.get(import_name) 

588 

589 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

590 return os.path.dirname(os.path.abspath(mod.__file__)) 

591 

592 # Next attempt: check the loader. 

593 try: 

594 spec = importlib.util.find_spec(import_name) 

595 

596 if spec is None: 

597 raise ValueError 

598 except (ImportError, ValueError): 

599 loader = None 

600 else: 

601 loader = spec.loader 

602 

603 # Loader does not exist or we're referring to an unloaded main 

604 # module or a main module without path (interactive sessions), go 

605 # with the current working directory. 

606 if loader is None: 

607 return os.getcwd() 

608 

609 if hasattr(loader, "get_filename"): 

610 filepath = loader.get_filename(import_name) # pyright: ignore 

611 else: 

612 # Fall back to imports. 

613 __import__(import_name) 

614 mod = sys.modules[import_name] 

615 filepath = getattr(mod, "__file__", None) 

616 

617 # If we don't have a file path it might be because it is a 

618 # namespace package. In this case pick the root path from the 

619 # first module that is contained in the package. 

620 if filepath is None: 

621 raise RuntimeError( 

622 "No root path can be found for the provided module" 

623 f" {import_name!r}. This can happen because the module" 

624 " came from an import hook that does not provide file" 

625 " name information or because it's a namespace package." 

626 " In this case the root path needs to be explicitly" 

627 " provided." 

628 ) 

629 

630 # filepath is import_name.py for a module, or __init__.py for a package. 

631 return os.path.dirname(os.path.abspath(filepath)) # type: ignore[no-any-return] 

632 

633 

634@cache 

635def _split_blueprint_path(name: str) -> list[str]: 

636 out: list[str] = [name] 

637 

638 if "." in name: 

639 out.extend(_split_blueprint_path(name.rpartition(".")[0])) 

640 

641 return out