Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask/helpers.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1from __future__ import annotations 

2 

3import importlib.util 

4import os 

5import sys 

6import typing as t 

7from datetime import datetime 

8from functools import lru_cache 

9from functools import update_wrapper 

10 

11import werkzeug.utils 

12from werkzeug.exceptions import abort as _wz_abort 

13from werkzeug.utils import redirect as _wz_redirect 

14from werkzeug.wrappers import Response as BaseResponse 

15 

16from .globals import _cv_request 

17from .globals import current_app 

18from .globals import request 

19from .globals import request_ctx 

20from .globals import session 

21from .signals import message_flashed 

22 

23if t.TYPE_CHECKING: # pragma: no cover 

24 from .wrappers import Response 

25 

26 

27def get_debug_flag() -> bool: 

28 """Get whether debug mode should be enabled for the app, indicated by the 

29 :envvar:`FLASK_DEBUG` environment variable. The default is ``False``. 

30 """ 

31 val = os.environ.get("FLASK_DEBUG") 

32 return bool(val and val.lower() not in {"0", "false", "no"}) 

33 

34 

35def get_load_dotenv(default: bool = True) -> bool: 

36 """Get whether the user has disabled loading default dotenv files by 

37 setting :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load 

38 the files. 

39 

40 :param default: What to return if the env var isn't set. 

41 """ 

42 val = os.environ.get("FLASK_SKIP_DOTENV") 

43 

44 if not val: 

45 return default 

46 

47 return val.lower() in ("0", "false", "no") 

48 

49 

50def stream_with_context( 

51 generator_or_function: t.Iterator[t.AnyStr] | t.Callable[..., t.Iterator[t.AnyStr]], 

52) -> t.Iterator[t.AnyStr]: 

53 """Request contexts disappear when the response is started on the server. 

54 This is done for efficiency reasons and to make it less likely to encounter 

55 memory leaks with badly written WSGI middlewares. The downside is that if 

56 you are using streamed responses, the generator cannot access request bound 

57 information any more. 

58 

59 This function however can help you keep the context around for longer:: 

60 

61 from flask import stream_with_context, request, Response 

62 

63 @app.route('/stream') 

64 def streamed_response(): 

65 @stream_with_context 

66 def generate(): 

67 yield 'Hello ' 

68 yield request.args['name'] 

69 yield '!' 

70 return Response(generate()) 

71 

72 Alternatively it can also be used around a specific generator:: 

73 

74 from flask import stream_with_context, request, Response 

75 

76 @app.route('/stream') 

77 def streamed_response(): 

78 def generate(): 

79 yield 'Hello ' 

80 yield request.args['name'] 

81 yield '!' 

82 return Response(stream_with_context(generate())) 

83 

84 .. versionadded:: 0.9 

85 """ 

86 try: 

87 gen = iter(generator_or_function) # type: ignore[arg-type] 

88 except TypeError: 

89 

90 def decorator(*args: t.Any, **kwargs: t.Any) -> t.Any: 

91 gen = generator_or_function(*args, **kwargs) # type: ignore[operator] 

92 return stream_with_context(gen) 

93 

94 return update_wrapper(decorator, generator_or_function) # type: ignore[arg-type] 

95 

96 def generator() -> t.Iterator[t.AnyStr | None]: 

97 ctx = _cv_request.get(None) 

98 if ctx is None: 

99 raise RuntimeError( 

100 "'stream_with_context' can only be used when a request" 

101 " context is active, such as in a view function." 

102 ) 

103 with ctx: 

104 # Dummy sentinel. Has to be inside the context block or we're 

105 # not actually keeping the context around. 

106 yield None 

107 

108 # The try/finally is here so that if someone passes a WSGI level 

109 # iterator in we're still running the cleanup logic. Generators 

110 # don't need that because they are closed on their destruction 

111 # automatically. 

112 try: 

113 yield from gen 

114 finally: 

115 if hasattr(gen, "close"): 

116 gen.close() 

117 

118 # The trick is to start the generator. Then the code execution runs until 

119 # the first dummy None is yielded at which point the context was already 

120 # pushed. This item is discarded. Then when the iteration continues the 

121 # real generator is executed. 

122 wrapped_g = generator() 

123 next(wrapped_g) 

124 return wrapped_g # type: ignore[return-value] 

125 

126 

127def make_response(*args: t.Any) -> Response: 

128 """Sometimes it is necessary to set additional headers in a view. Because 

129 views do not have to return response objects but can return a value that 

130 is converted into a response object by Flask itself, it becomes tricky to 

131 add headers to it. This function can be called instead of using a return 

132 and you will get a response object which you can use to attach headers. 

133 

134 If view looked like this and you want to add a new header:: 

135 

136 def index(): 

137 return render_template('index.html', foo=42) 

138 

139 You can now do something like this:: 

140 

141 def index(): 

142 response = make_response(render_template('index.html', foo=42)) 

143 response.headers['X-Parachutes'] = 'parachutes are cool' 

144 return response 

145 

146 This function accepts the very same arguments you can return from a 

147 view function. This for example creates a response with a 404 error 

148 code:: 

149 

150 response = make_response(render_template('not_found.html'), 404) 

151 

152 The other use case of this function is to force the return value of a 

153 view function into a response which is helpful with view 

154 decorators:: 

155 

156 response = make_response(view_function()) 

157 response.headers['X-Parachutes'] = 'parachutes are cool' 

158 

159 Internally this function does the following things: 

160 

161 - if no arguments are passed, it creates a new response argument 

162 - if one argument is passed, :meth:`flask.Flask.make_response` 

163 is invoked with it. 

164 - if more than one argument is passed, the arguments are passed 

165 to the :meth:`flask.Flask.make_response` function as tuple. 

166 

167 .. versionadded:: 0.6 

168 """ 

169 if not args: 

170 return current_app.response_class() 

171 if len(args) == 1: 

172 args = args[0] 

173 return current_app.make_response(args) 

174 

175 

176def url_for( 

177 endpoint: str, 

178 *, 

179 _anchor: str | None = None, 

180 _method: str | None = None, 

181 _scheme: str | None = None, 

182 _external: bool | None = None, 

183 **values: t.Any, 

184) -> str: 

185 """Generate a URL to the given endpoint with the given values. 

186 

187 This requires an active request or application context, and calls 

188 :meth:`current_app.url_for() <flask.Flask.url_for>`. See that method 

189 for full documentation. 

190 

191 :param endpoint: The endpoint name associated with the URL to 

192 generate. If this starts with a ``.``, the current blueprint 

193 name (if any) will be used. 

194 :param _anchor: If given, append this as ``#anchor`` to the URL. 

195 :param _method: If given, generate the URL associated with this 

196 method for the endpoint. 

197 :param _scheme: If given, the URL will have this scheme if it is 

198 external. 

199 :param _external: If given, prefer the URL to be internal (False) or 

200 require it to be external (True). External URLs include the 

201 scheme and domain. When not in an active request, URLs are 

202 external by default. 

203 :param values: Values to use for the variable parts of the URL rule. 

204 Unknown keys are appended as query string arguments, like 

205 ``?a=b&c=d``. 

206 

207 .. versionchanged:: 2.2 

208 Calls ``current_app.url_for``, allowing an app to override the 

209 behavior. 

210 

211 .. versionchanged:: 0.10 

212 The ``_scheme`` parameter was added. 

213 

214 .. versionchanged:: 0.9 

215 The ``_anchor`` and ``_method`` parameters were added. 

216 

217 .. versionchanged:: 0.9 

218 Calls ``app.handle_url_build_error`` on build errors. 

219 """ 

220 return current_app.url_for( 

221 endpoint, 

222 _anchor=_anchor, 

223 _method=_method, 

224 _scheme=_scheme, 

225 _external=_external, 

226 **values, 

227 ) 

228 

229 

230def redirect( 

231 location: str, code: int = 302, Response: type[BaseResponse] | None = None 

232) -> BaseResponse: 

233 """Create a redirect response object. 

234 

235 If :data:`~flask.current_app` is available, it will use its 

236 :meth:`~flask.Flask.redirect` method, otherwise it will use 

237 :func:`werkzeug.utils.redirect`. 

238 

239 :param location: The URL to redirect to. 

240 :param code: The status code for the redirect. 

241 :param Response: The response class to use. Not used when 

242 ``current_app`` is active, which uses ``app.response_class``. 

243 

244 .. versionadded:: 2.2 

245 Calls ``current_app.redirect`` if available instead of always 

246 using Werkzeug's default ``redirect``. 

247 """ 

248 if current_app: 

249 return current_app.redirect(location, code=code) 

250 

251 return _wz_redirect(location, code=code, Response=Response) 

252 

253 

254def abort(code: int | BaseResponse, *args: t.Any, **kwargs: t.Any) -> t.NoReturn: 

255 """Raise an :exc:`~werkzeug.exceptions.HTTPException` for the given 

256 status code. 

257 

258 If :data:`~flask.current_app` is available, it will call its 

259 :attr:`~flask.Flask.aborter` object, otherwise it will use 

260 :func:`werkzeug.exceptions.abort`. 

261 

262 :param code: The status code for the exception, which must be 

263 registered in ``app.aborter``. 

264 :param args: Passed to the exception. 

265 :param kwargs: Passed to the exception. 

266 

267 .. versionadded:: 2.2 

268 Calls ``current_app.aborter`` if available instead of always 

269 using Werkzeug's default ``abort``. 

270 """ 

271 if current_app: 

272 current_app.aborter(code, *args, **kwargs) 

273 

274 _wz_abort(code, *args, **kwargs) 

275 

276 

277def get_template_attribute(template_name: str, attribute: str) -> t.Any: 

278 """Loads a macro (or variable) a template exports. This can be used to 

279 invoke a macro from within Python code. If you for example have a 

280 template named :file:`_cider.html` with the following contents: 

281 

282 .. sourcecode:: html+jinja 

283 

284 {% macro hello(name) %}Hello {{ name }}!{% endmacro %} 

285 

286 You can access this from Python code like this:: 

287 

288 hello = get_template_attribute('_cider.html', 'hello') 

289 return hello('World') 

290 

291 .. versionadded:: 0.2 

292 

293 :param template_name: the name of the template 

294 :param attribute: the name of the variable of macro to access 

295 """ 

296 return getattr(current_app.jinja_env.get_template(template_name).module, attribute) 

297 

298 

299def flash(message: str, category: str = "message") -> None: 

300 """Flashes a message to the next request. In order to remove the 

301 flashed message from the session and to display it to the user, 

302 the template has to call :func:`get_flashed_messages`. 

303 

304 .. versionchanged:: 0.3 

305 `category` parameter added. 

306 

307 :param message: the message to be flashed. 

308 :param category: the category for the message. The following values 

309 are recommended: ``'message'`` for any kind of message, 

310 ``'error'`` for errors, ``'info'`` for information 

311 messages and ``'warning'`` for warnings. However any 

312 kind of string can be used as category. 

313 """ 

314 # Original implementation: 

315 # 

316 # session.setdefault('_flashes', []).append((category, message)) 

317 # 

318 # This assumed that changes made to mutable structures in the session are 

319 # always in sync with the session object, which is not true for session 

320 # implementations that use external storage for keeping their keys/values. 

321 flashes = session.get("_flashes", []) 

322 flashes.append((category, message)) 

323 session["_flashes"] = flashes 

324 app = current_app._get_current_object() # type: ignore 

325 message_flashed.send( 

326 app, 

327 _async_wrapper=app.ensure_sync, 

328 message=message, 

329 category=category, 

330 ) 

331 

332 

333def get_flashed_messages( 

334 with_categories: bool = False, category_filter: t.Iterable[str] = () 

335) -> list[str] | list[tuple[str, str]]: 

336 """Pulls all flashed messages from the session and returns them. 

337 Further calls in the same request to the function will return 

338 the same messages. By default just the messages are returned, 

339 but when `with_categories` is set to ``True``, the return value will 

340 be a list of tuples in the form ``(category, message)`` instead. 

341 

342 Filter the flashed messages to one or more categories by providing those 

343 categories in `category_filter`. This allows rendering categories in 

344 separate html blocks. The `with_categories` and `category_filter` 

345 arguments are distinct: 

346 

347 * `with_categories` controls whether categories are returned with message 

348 text (``True`` gives a tuple, where ``False`` gives just the message text). 

349 * `category_filter` filters the messages down to only those matching the 

350 provided categories. 

351 

352 See :doc:`/patterns/flashing` for examples. 

353 

354 .. versionchanged:: 0.3 

355 `with_categories` parameter added. 

356 

357 .. versionchanged:: 0.9 

358 `category_filter` parameter added. 

359 

360 :param with_categories: set to ``True`` to also receive categories. 

361 :param category_filter: filter of categories to limit return values. Only 

362 categories in the list will be returned. 

363 """ 

364 flashes = request_ctx.flashes 

365 if flashes is None: 

366 flashes = session.pop("_flashes") if "_flashes" in session else [] 

367 request_ctx.flashes = flashes 

368 if category_filter: 

369 flashes = list(filter(lambda f: f[0] in category_filter, flashes)) 

370 if not with_categories: 

371 return [x[1] for x in flashes] 

372 return flashes 

373 

374 

375def _prepare_send_file_kwargs(**kwargs: t.Any) -> dict[str, t.Any]: 

376 if kwargs.get("max_age") is None: 

377 kwargs["max_age"] = current_app.get_send_file_max_age 

378 

379 kwargs.update( 

380 environ=request.environ, 

381 use_x_sendfile=current_app.config["USE_X_SENDFILE"], 

382 response_class=current_app.response_class, 

383 _root_path=current_app.root_path, # type: ignore 

384 ) 

385 return kwargs 

386 

387 

388def send_file( 

389 path_or_file: os.PathLike[t.AnyStr] | str | t.BinaryIO, 

390 mimetype: str | None = None, 

391 as_attachment: bool = False, 

392 download_name: str | None = None, 

393 conditional: bool = True, 

394 etag: bool | str = True, 

395 last_modified: datetime | int | float | None = None, 

396 max_age: None | (int | t.Callable[[str | None], int | None]) = None, 

397) -> Response: 

398 """Send the contents of a file to the client. 

399 

400 The first argument can be a file path or a file-like object. Paths 

401 are preferred in most cases because Werkzeug can manage the file and 

402 get extra information from the path. Passing a file-like object 

403 requires that the file is opened in binary mode, and is mostly 

404 useful when building a file in memory with :class:`io.BytesIO`. 

405 

406 Never pass file paths provided by a user. The path is assumed to be 

407 trusted, so a user could craft a path to access a file you didn't 

408 intend. Use :func:`send_from_directory` to safely serve 

409 user-requested paths from within a directory. 

410 

411 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is 

412 used, otherwise Werkzeug's built-in wrapper is used. Alternatively, 

413 if the HTTP server supports ``X-Sendfile``, configuring Flask with 

414 ``USE_X_SENDFILE = True`` will tell the server to send the given 

415 path, which is much more efficient than reading it in Python. 

416 

417 :param path_or_file: The path to the file to send, relative to the 

418 current working directory if a relative path is given. 

419 Alternatively, a file-like object opened in binary mode. Make 

420 sure the file pointer is seeked to the start of the data. 

421 :param mimetype: The MIME type to send for the file. If not 

422 provided, it will try to detect it from the file name. 

423 :param as_attachment: Indicate to a browser that it should offer to 

424 save the file instead of displaying it. 

425 :param download_name: The default name browsers will use when saving 

426 the file. Defaults to the passed file name. 

427 :param conditional: Enable conditional and range responses based on 

428 request headers. Requires passing a file path and ``environ``. 

429 :param etag: Calculate an ETag for the file, which requires passing 

430 a file path. Can also be a string to use instead. 

431 :param last_modified: The last modified time to send for the file, 

432 in seconds. If not provided, it will try to detect it from the 

433 file path. 

434 :param max_age: How long the client should cache the file, in 

435 seconds. If set, ``Cache-Control`` will be ``public``, otherwise 

436 it will be ``no-cache`` to prefer conditional caching. 

437 

438 .. versionchanged:: 2.0 

439 ``download_name`` replaces the ``attachment_filename`` 

440 parameter. If ``as_attachment=False``, it is passed with 

441 ``Content-Disposition: inline`` instead. 

442 

443 .. versionchanged:: 2.0 

444 ``max_age`` replaces the ``cache_timeout`` parameter. 

445 ``conditional`` is enabled and ``max_age`` is not set by 

446 default. 

447 

448 .. versionchanged:: 2.0 

449 ``etag`` replaces the ``add_etags`` parameter. It can be a 

450 string to use instead of generating one. 

451 

452 .. versionchanged:: 2.0 

453 Passing a file-like object that inherits from 

454 :class:`~io.TextIOBase` will raise a :exc:`ValueError` rather 

455 than sending an empty file. 

456 

457 .. versionadded:: 2.0 

458 Moved the implementation to Werkzeug. This is now a wrapper to 

459 pass some Flask-specific arguments. 

460 

461 .. versionchanged:: 1.1 

462 ``filename`` may be a :class:`~os.PathLike` object. 

463 

464 .. versionchanged:: 1.1 

465 Passing a :class:`~io.BytesIO` object supports range requests. 

466 

467 .. versionchanged:: 1.0.3 

468 Filenames are encoded with ASCII instead of Latin-1 for broader 

469 compatibility with WSGI servers. 

470 

471 .. versionchanged:: 1.0 

472 UTF-8 filenames as specified in :rfc:`2231` are supported. 

473 

474 .. versionchanged:: 0.12 

475 The filename is no longer automatically inferred from file 

476 objects. If you want to use automatic MIME and etag support, 

477 pass a filename via ``filename_or_fp`` or 

478 ``attachment_filename``. 

479 

480 .. versionchanged:: 0.12 

481 ``attachment_filename`` is preferred over ``filename`` for MIME 

482 detection. 

483 

484 .. versionchanged:: 0.9 

485 ``cache_timeout`` defaults to 

486 :meth:`Flask.get_send_file_max_age`. 

487 

488 .. versionchanged:: 0.7 

489 MIME guessing and etag support for file-like objects was 

490 removed because it was unreliable. Pass a filename if you are 

491 able to, otherwise attach an etag yourself. 

492 

493 .. versionchanged:: 0.5 

494 The ``add_etags``, ``cache_timeout`` and ``conditional`` 

495 parameters were added. The default behavior is to add etags. 

496 

497 .. versionadded:: 0.2 

498 """ 

499 return werkzeug.utils.send_file( # type: ignore[return-value] 

500 **_prepare_send_file_kwargs( 

501 path_or_file=path_or_file, 

502 environ=request.environ, 

503 mimetype=mimetype, 

504 as_attachment=as_attachment, 

505 download_name=download_name, 

506 conditional=conditional, 

507 etag=etag, 

508 last_modified=last_modified, 

509 max_age=max_age, 

510 ) 

511 ) 

512 

513 

514def send_from_directory( 

515 directory: os.PathLike[str] | str, 

516 path: os.PathLike[str] | str, 

517 **kwargs: t.Any, 

518) -> Response: 

519 """Send a file from within a directory using :func:`send_file`. 

520 

521 .. code-block:: python 

522 

523 @app.route("/uploads/<path:name>") 

524 def download_file(name): 

525 return send_from_directory( 

526 app.config['UPLOAD_FOLDER'], name, as_attachment=True 

527 ) 

528 

529 This is a secure way to serve files from a folder, such as static 

530 files or uploads. Uses :func:`~werkzeug.security.safe_join` to 

531 ensure the path coming from the client is not maliciously crafted to 

532 point outside the specified directory. 

533 

534 If the final path does not point to an existing regular file, 

535 raises a 404 :exc:`~werkzeug.exceptions.NotFound` error. 

536 

537 :param directory: The directory that ``path`` must be located under, 

538 relative to the current application's root path. 

539 :param path: The path to the file to send, relative to 

540 ``directory``. 

541 :param kwargs: Arguments to pass to :func:`send_file`. 

542 

543 .. versionchanged:: 2.0 

544 ``path`` replaces the ``filename`` parameter. 

545 

546 .. versionadded:: 2.0 

547 Moved the implementation to Werkzeug. This is now a wrapper to 

548 pass some Flask-specific arguments. 

549 

550 .. versionadded:: 0.5 

551 """ 

552 return werkzeug.utils.send_from_directory( # type: ignore[return-value] 

553 directory, path, **_prepare_send_file_kwargs(**kwargs) 

554 ) 

555 

556 

557def get_root_path(import_name: str) -> str: 

558 """Find the root path of a package, or the path that contains a 

559 module. If it cannot be found, returns the current working 

560 directory. 

561 

562 Not to be confused with the value returned by :func:`find_package`. 

563 

564 :meta private: 

565 """ 

566 # Module already imported and has a file attribute. Use that first. 

567 mod = sys.modules.get(import_name) 

568 

569 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

570 return os.path.dirname(os.path.abspath(mod.__file__)) 

571 

572 # Next attempt: check the loader. 

573 try: 

574 spec = importlib.util.find_spec(import_name) 

575 

576 if spec is None: 

577 raise ValueError 

578 except (ImportError, ValueError): 

579 loader = None 

580 else: 

581 loader = spec.loader 

582 

583 # Loader does not exist or we're referring to an unloaded main 

584 # module or a main module without path (interactive sessions), go 

585 # with the current working directory. 

586 if loader is None: 

587 return os.getcwd() 

588 

589 if hasattr(loader, "get_filename"): 

590 filepath = loader.get_filename(import_name) 

591 else: 

592 # Fall back to imports. 

593 __import__(import_name) 

594 mod = sys.modules[import_name] 

595 filepath = getattr(mod, "__file__", None) 

596 

597 # If we don't have a file path it might be because it is a 

598 # namespace package. In this case pick the root path from the 

599 # first module that is contained in the package. 

600 if filepath is None: 

601 raise RuntimeError( 

602 "No root path can be found for the provided module" 

603 f" {import_name!r}. This can happen because the module" 

604 " came from an import hook that does not provide file" 

605 " name information or because it's a namespace package." 

606 " In this case the root path needs to be explicitly" 

607 " provided." 

608 ) 

609 

610 # filepath is import_name.py for a module, or __init__.py for a package. 

611 return os.path.dirname(os.path.abspath(filepath)) # type: ignore[no-any-return] 

612 

613 

614@lru_cache(maxsize=None) 

615def _split_blueprint_path(name: str) -> list[str]: 

616 out: list[str] = [name] 

617 

618 if "." in name: 

619 out.extend(_split_blueprint_path(name.rpartition(".")[0])) 

620 

621 return out