Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/utils.py: 27%
241 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import io
4import mimetypes
5import os
6import pkgutil
7import re
8import sys
9import typing as t
10import unicodedata
11from datetime import datetime
12from time import time
13from urllib.parse import quote
14from zlib import adler32
16from markupsafe import escape
18from ._internal import _DictAccessorProperty
19from ._internal import _missing
20from ._internal import _TAccessorValue
21from .datastructures import Headers
22from .exceptions import NotFound
23from .exceptions import RequestedRangeNotSatisfiable
24from .security import safe_join
25from .wsgi import wrap_file
27if t.TYPE_CHECKING:
28 from _typeshed.wsgi import WSGIEnvironment
29 from .wrappers.request import Request
30 from .wrappers.response import Response
32_T = t.TypeVar("_T")
34_entity_re = re.compile(r"&([^;]+);")
35_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]")
36_windows_device_files = {
37 "CON",
38 "PRN",
39 "AUX",
40 "NUL",
41 *(f"COM{i}" for i in range(10)),
42 *(f"LPT{i}" for i in range(10)),
43}
46class cached_property(property, t.Generic[_T]):
47 """A :func:`property` that is only evaluated once. Subsequent access
48 returns the cached value. Setting the property sets the cached
49 value. Deleting the property clears the cached value, accessing it
50 again will evaluate it again.
52 .. code-block:: python
54 class Example:
55 @cached_property
56 def value(self):
57 # calculate something important here
58 return 42
60 e = Example()
61 e.value # evaluates
62 e.value # uses cache
63 e.value = 16 # sets cache
64 del e.value # clears cache
66 If the class defines ``__slots__``, it must add ``_cache_{name}`` as
67 a slot. Alternatively, it can add ``__dict__``, but that's usually
68 not desirable.
70 .. versionchanged:: 2.1
71 Works with ``__slots__``.
73 .. versionchanged:: 2.0
74 ``del obj.name`` clears the cached value.
75 """
77 def __init__(
78 self,
79 fget: t.Callable[[t.Any], _T],
80 name: str | None = None,
81 doc: str | None = None,
82 ) -> None:
83 super().__init__(fget, doc=doc)
84 self.__name__ = name or fget.__name__
85 self.slot_name = f"_cache_{self.__name__}"
86 self.__module__ = fget.__module__
88 def __set__(self, obj: object, value: _T) -> None:
89 if hasattr(obj, "__dict__"):
90 obj.__dict__[self.__name__] = value
91 else:
92 setattr(obj, self.slot_name, value)
94 def __get__(self, obj: object, type: type = None) -> _T: # type: ignore
95 if obj is None:
96 return self # type: ignore
98 obj_dict = getattr(obj, "__dict__", None)
100 if obj_dict is not None:
101 value: _T = obj_dict.get(self.__name__, _missing)
102 else:
103 value = getattr(obj, self.slot_name, _missing) # type: ignore[arg-type]
105 if value is _missing:
106 value = self.fget(obj) # type: ignore
108 if obj_dict is not None:
109 obj.__dict__[self.__name__] = value
110 else:
111 setattr(obj, self.slot_name, value)
113 return value
115 def __delete__(self, obj: object) -> None:
116 if hasattr(obj, "__dict__"):
117 del obj.__dict__[self.__name__]
118 else:
119 setattr(obj, self.slot_name, _missing)
122class environ_property(_DictAccessorProperty[_TAccessorValue]):
123 """Maps request attributes to environment variables. This works not only
124 for the Werkzeug request object, but also any other class with an
125 environ attribute:
127 >>> class Test(object):
128 ... environ = {'key': 'value'}
129 ... test = environ_property('key')
130 >>> var = Test()
131 >>> var.test
132 'value'
134 If you pass it a second value it's used as default if the key does not
135 exist, the third one can be a converter that takes a value and converts
136 it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value
137 is used. If no default value is provided `None` is used.
139 Per default the property is read only. You have to explicitly enable it
140 by passing ``read_only=False`` to the constructor.
141 """
143 read_only = True
145 def lookup(self, obj: Request) -> WSGIEnvironment:
146 return obj.environ
149class header_property(_DictAccessorProperty[_TAccessorValue]):
150 """Like `environ_property` but for headers."""
152 def lookup(self, obj: Request | Response) -> Headers:
153 return obj.headers
156# https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in
157# https://www.iana.org/assignments/media-types/media-types.xhtml
158# Types listed in the XDG mime info that have a charset in the IANA registration.
159_charset_mimetypes = {
160 "application/ecmascript",
161 "application/javascript",
162 "application/sql",
163 "application/xml",
164 "application/xml-dtd",
165 "application/xml-external-parsed-entity",
166}
169def get_content_type(mimetype: str, charset: str) -> str:
170 """Returns the full content type string with charset for a mimetype.
172 If the mimetype represents text, the charset parameter will be
173 appended, otherwise the mimetype is returned unchanged.
175 :param mimetype: The mimetype to be used as content type.
176 :param charset: The charset to be appended for text mimetypes.
177 :return: The content type.
179 .. versionchanged:: 0.15
180 Any type that ends with ``+xml`` gets a charset, not just those
181 that start with ``application/``. Known text types such as
182 ``application/javascript`` are also given charsets.
183 """
184 if (
185 mimetype.startswith("text/")
186 or mimetype in _charset_mimetypes
187 or mimetype.endswith("+xml")
188 ):
189 mimetype += f"; charset={charset}"
191 return mimetype
194def secure_filename(filename: str) -> str:
195 r"""Pass it a filename and it will return a secure version of it. This
196 filename can then safely be stored on a regular file system and passed
197 to :func:`os.path.join`. The filename returned is an ASCII only string
198 for maximum portability.
200 On windows systems the function also makes sure that the file is not
201 named after one of the special device files.
203 >>> secure_filename("My cool movie.mov")
204 'My_cool_movie.mov'
205 >>> secure_filename("../../../etc/passwd")
206 'etc_passwd'
207 >>> secure_filename('i contain cool \xfcml\xe4uts.txt')
208 'i_contain_cool_umlauts.txt'
210 The function might return an empty filename. It's your responsibility
211 to ensure that the filename is unique and that you abort or
212 generate a random filename if the function returned an empty one.
214 .. versionadded:: 0.5
216 :param filename: the filename to secure
217 """
218 filename = unicodedata.normalize("NFKD", filename)
219 filename = filename.encode("ascii", "ignore").decode("ascii")
221 for sep in os.sep, os.path.altsep:
222 if sep:
223 filename = filename.replace(sep, " ")
224 filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip(
225 "._"
226 )
228 # on nt a couple of special files are present in each folder. We
229 # have to ensure that the target file is not such a filename. In
230 # this case we prepend an underline
231 if (
232 os.name == "nt"
233 and filename
234 and filename.split(".")[0].upper() in _windows_device_files
235 ):
236 filename = f"_{filename}"
238 return filename
241def redirect(
242 location: str, code: int = 302, Response: type[Response] | None = None
243) -> Response:
244 """Returns a response object (a WSGI application) that, if called,
245 redirects the client to the target location. Supported codes are
246 301, 302, 303, 305, 307, and 308. 300 is not supported because
247 it's not a real redirect and 304 because it's the answer for a
248 request with a request with defined If-Modified-Since headers.
250 .. versionadded:: 0.6
251 The location can now be a unicode string that is encoded using
252 the :func:`iri_to_uri` function.
254 .. versionadded:: 0.10
255 The class used for the Response object can now be passed in.
257 :param location: the location the response should redirect to.
258 :param code: the redirect status code. defaults to 302.
259 :param class Response: a Response class to use when instantiating a
260 response. The default is :class:`werkzeug.wrappers.Response` if
261 unspecified.
262 """
263 if Response is None:
264 from .wrappers import Response
266 html_location = escape(location)
267 response = Response( # type: ignore[misc]
268 "<!doctype html>\n"
269 "<html lang=en>\n"
270 "<title>Redirecting...</title>\n"
271 "<h1>Redirecting...</h1>\n"
272 "<p>You should be redirected automatically to the target URL: "
273 f'<a href="{html_location}">{html_location}</a>. If not, click the link.\n',
274 code,
275 mimetype="text/html",
276 )
277 response.headers["Location"] = location
278 return response
281def append_slash_redirect(environ: WSGIEnvironment, code: int = 308) -> Response:
282 """Redirect to the current URL with a slash appended.
284 If the current URL is ``/user/42``, the redirect URL will be
285 ``42/``. When joined to the current URL during response
286 processing or by the browser, this will produce ``/user/42/``.
288 The behavior is undefined if the path ends with a slash already. If
289 called unconditionally on a URL, it may produce a redirect loop.
291 :param environ: Use the path and query from this WSGI environment
292 to produce the redirect URL.
293 :param code: the status code for the redirect.
295 .. versionchanged:: 2.1
296 Produce a relative URL that only modifies the last segment.
297 Relevant when the current path has multiple segments.
299 .. versionchanged:: 2.1
300 The default status code is 308 instead of 301. This preserves
301 the request method and body.
302 """
303 tail = environ["PATH_INFO"].rpartition("/")[2]
305 if not tail:
306 new_path = "./"
307 else:
308 new_path = f"{tail}/"
310 query_string = environ.get("QUERY_STRING")
312 if query_string:
313 new_path = f"{new_path}?{query_string}"
315 return redirect(new_path, code)
318def send_file(
319 path_or_file: os.PathLike | str | t.IO[bytes],
320 environ: WSGIEnvironment,
321 mimetype: str | None = None,
322 as_attachment: bool = False,
323 download_name: str | None = None,
324 conditional: bool = True,
325 etag: bool | str = True,
326 last_modified: datetime | int | float | None = None,
327 max_age: None | (int | t.Callable[[str | None], int | None]) = None,
328 use_x_sendfile: bool = False,
329 response_class: type[Response] | None = None,
330 _root_path: os.PathLike | str | None = None,
331) -> Response:
332 """Send the contents of a file to the client.
334 The first argument can be a file path or a file-like object. Paths
335 are preferred in most cases because Werkzeug can manage the file and
336 get extra information from the path. Passing a file-like object
337 requires that the file is opened in binary mode, and is mostly
338 useful when building a file in memory with :class:`io.BytesIO`.
340 Never pass file paths provided by a user. The path is assumed to be
341 trusted, so a user could craft a path to access a file you didn't
342 intend. Use :func:`send_from_directory` to safely serve user-provided paths.
344 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
345 used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
346 if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True``
347 will tell the server to send the given path, which is much more
348 efficient than reading it in Python.
350 :param path_or_file: The path to the file to send, relative to the
351 current working directory if a relative path is given.
352 Alternatively, a file-like object opened in binary mode. Make
353 sure the file pointer is seeked to the start of the data.
354 :param environ: The WSGI environ for the current request.
355 :param mimetype: The MIME type to send for the file. If not
356 provided, it will try to detect it from the file name.
357 :param as_attachment: Indicate to a browser that it should offer to
358 save the file instead of displaying it.
359 :param download_name: The default name browsers will use when saving
360 the file. Defaults to the passed file name.
361 :param conditional: Enable conditional and range responses based on
362 request headers. Requires passing a file path and ``environ``.
363 :param etag: Calculate an ETag for the file, which requires passing
364 a file path. Can also be a string to use instead.
365 :param last_modified: The last modified time to send for the file,
366 in seconds. If not provided, it will try to detect it from the
367 file path.
368 :param max_age: How long the client should cache the file, in
369 seconds. If set, ``Cache-Control`` will be ``public``, otherwise
370 it will be ``no-cache`` to prefer conditional caching.
371 :param use_x_sendfile: Set the ``X-Sendfile`` header to let the
372 server to efficiently send the file. Requires support from the
373 HTTP server. Requires passing a file path.
374 :param response_class: Build the response using this class. Defaults
375 to :class:`~werkzeug.wrappers.Response`.
376 :param _root_path: Do not use. For internal use only. Use
377 :func:`send_from_directory` to safely send files under a path.
379 .. versionchanged:: 2.0.2
380 ``send_file`` only sets a detected ``Content-Encoding`` if
381 ``as_attachment`` is disabled.
383 .. versionadded:: 2.0
384 Adapted from Flask's implementation.
386 .. versionchanged:: 2.0
387 ``download_name`` replaces Flask's ``attachment_filename``
388 parameter. If ``as_attachment=False``, it is passed with
389 ``Content-Disposition: inline`` instead.
391 .. versionchanged:: 2.0
392 ``max_age`` replaces Flask's ``cache_timeout`` parameter.
393 ``conditional`` is enabled and ``max_age`` is not set by
394 default.
396 .. versionchanged:: 2.0
397 ``etag`` replaces Flask's ``add_etags`` parameter. It can be a
398 string to use instead of generating one.
400 .. versionchanged:: 2.0
401 If an encoding is returned when guessing ``mimetype`` from
402 ``download_name``, set the ``Content-Encoding`` header.
403 """
404 if response_class is None:
405 from .wrappers import Response
407 response_class = Response
409 path: str | None = None
410 file: t.IO[bytes] | None = None
411 size: int | None = None
412 mtime: float | None = None
413 headers = Headers()
415 if isinstance(path_or_file, (os.PathLike, str)) or hasattr(
416 path_or_file, "__fspath__"
417 ):
418 path_or_file = t.cast(t.Union[os.PathLike, str], path_or_file)
420 # Flask will pass app.root_path, allowing its send_file wrapper
421 # to not have to deal with paths.
422 if _root_path is not None:
423 path = os.path.join(_root_path, path_or_file)
424 else:
425 path = os.path.abspath(path_or_file)
427 stat = os.stat(path)
428 size = stat.st_size
429 mtime = stat.st_mtime
430 else:
431 file = path_or_file
433 if download_name is None and path is not None:
434 download_name = os.path.basename(path)
436 if mimetype is None:
437 if download_name is None:
438 raise TypeError(
439 "Unable to detect the MIME type because a file name is"
440 " not available. Either set 'download_name', pass a"
441 " path instead of a file, or set 'mimetype'."
442 )
444 mimetype, encoding = mimetypes.guess_type(download_name)
446 if mimetype is None:
447 mimetype = "application/octet-stream"
449 # Don't send encoding for attachments, it causes browsers to
450 # save decompress tar.gz files.
451 if encoding is not None and not as_attachment:
452 headers.set("Content-Encoding", encoding)
454 if download_name is not None:
455 try:
456 download_name.encode("ascii")
457 except UnicodeEncodeError:
458 simple = unicodedata.normalize("NFKD", download_name)
459 simple = simple.encode("ascii", "ignore").decode("ascii")
460 # safe = RFC 5987 attr-char
461 quoted = quote(download_name, safe="!#$&+-.^_`|~")
462 names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
463 else:
464 names = {"filename": download_name}
466 value = "attachment" if as_attachment else "inline"
467 headers.set("Content-Disposition", value, **names)
468 elif as_attachment:
469 raise TypeError(
470 "No name provided for attachment. Either set"
471 " 'download_name' or pass a path instead of a file."
472 )
474 if use_x_sendfile and path is not None:
475 headers["X-Sendfile"] = path
476 data = None
477 else:
478 if file is None:
479 file = open(path, "rb") # type: ignore
480 elif isinstance(file, io.BytesIO):
481 size = file.getbuffer().nbytes
482 elif isinstance(file, io.TextIOBase):
483 raise ValueError("Files must be opened in binary mode or use BytesIO.")
485 data = wrap_file(environ, file)
487 rv = response_class(
488 data, mimetype=mimetype, headers=headers, direct_passthrough=True
489 )
491 if size is not None:
492 rv.content_length = size
494 if last_modified is not None:
495 rv.last_modified = last_modified # type: ignore
496 elif mtime is not None:
497 rv.last_modified = mtime # type: ignore
499 rv.cache_control.no_cache = True
501 # Flask will pass app.get_send_file_max_age, allowing its send_file
502 # wrapper to not have to deal with paths.
503 if callable(max_age):
504 max_age = max_age(path)
506 if max_age is not None:
507 if max_age > 0:
508 rv.cache_control.no_cache = None
509 rv.cache_control.public = True
511 rv.cache_control.max_age = max_age
512 rv.expires = int(time() + max_age) # type: ignore
514 if isinstance(etag, str):
515 rv.set_etag(etag)
516 elif etag and path is not None:
517 check = adler32(path.encode("utf-8")) & 0xFFFFFFFF
518 rv.set_etag(f"{mtime}-{size}-{check}")
520 if conditional:
521 try:
522 rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size)
523 except RequestedRangeNotSatisfiable:
524 if file is not None:
525 file.close()
527 raise
529 # Some x-sendfile implementations incorrectly ignore the 304
530 # status code and send the file anyway.
531 if rv.status_code == 304:
532 rv.headers.pop("x-sendfile", None)
534 return rv
537def send_from_directory(
538 directory: os.PathLike | str,
539 path: os.PathLike | str,
540 environ: WSGIEnvironment,
541 **kwargs: t.Any,
542) -> Response:
543 """Send a file from within a directory using :func:`send_file`.
545 This is a secure way to serve files from a folder, such as static
546 files or uploads. Uses :func:`~werkzeug.security.safe_join` to
547 ensure the path coming from the client is not maliciously crafted to
548 point outside the specified directory.
550 If the final path does not point to an existing regular file,
551 returns a 404 :exc:`~werkzeug.exceptions.NotFound` error.
553 :param directory: The directory that ``path`` must be located under. This *must not*
554 be a value provided by the client, otherwise it becomes insecure.
555 :param path: The path to the file to send, relative to ``directory``. This is the
556 part of the path provided by the client, which is checked for security.
557 :param environ: The WSGI environ for the current request.
558 :param kwargs: Arguments to pass to :func:`send_file`.
560 .. versionadded:: 2.0
561 Adapted from Flask's implementation.
562 """
563 path = safe_join(os.fspath(directory), os.fspath(path))
565 if path is None:
566 raise NotFound()
568 # Flask will pass app.root_path, allowing its send_from_directory
569 # wrapper to not have to deal with paths.
570 if "_root_path" in kwargs:
571 path = os.path.join(kwargs["_root_path"], path)
573 if not os.path.isfile(path):
574 raise NotFound()
576 return send_file(path, environ, **kwargs)
579def import_string(import_name: str, silent: bool = False) -> t.Any:
580 """Imports an object based on a string. This is useful if you want to
581 use import paths as endpoints or something similar. An import path can
582 be specified either in dotted notation (``xml.sax.saxutils.escape``)
583 or with a colon as object delimiter (``xml.sax.saxutils:escape``).
585 If `silent` is True the return value will be `None` if the import fails.
587 :param import_name: the dotted name for the object to import.
588 :param silent: if set to `True` import errors are ignored and
589 `None` is returned instead.
590 :return: imported object
591 """
592 import_name = import_name.replace(":", ".")
593 try:
594 try:
595 __import__(import_name)
596 except ImportError:
597 if "." not in import_name:
598 raise
599 else:
600 return sys.modules[import_name]
602 module_name, obj_name = import_name.rsplit(".", 1)
603 module = __import__(module_name, globals(), locals(), [obj_name])
604 try:
605 return getattr(module, obj_name)
606 except AttributeError as e:
607 raise ImportError(e) from None
609 except ImportError as e:
610 if not silent:
611 raise ImportStringError(import_name, e).with_traceback(
612 sys.exc_info()[2]
613 ) from None
615 return None
618def find_modules(
619 import_path: str, include_packages: bool = False, recursive: bool = False
620) -> t.Iterator[str]:
621 """Finds all the modules below a package. This can be useful to
622 automatically import all views / controllers so that their metaclasses /
623 function decorators have a chance to register themselves on the
624 application.
626 Packages are not returned unless `include_packages` is `True`. This can
627 also recursively list modules but in that case it will import all the
628 packages to get the correct load path of that module.
630 :param import_path: the dotted name for the package to find child modules.
631 :param include_packages: set to `True` if packages should be returned, too.
632 :param recursive: set to `True` if recursion should happen.
633 :return: generator
634 """
635 module = import_string(import_path)
636 path = getattr(module, "__path__", None)
637 if path is None:
638 raise ValueError(f"{import_path!r} is not a package")
639 basename = f"{module.__name__}."
640 for _importer, modname, ispkg in pkgutil.iter_modules(path):
641 modname = basename + modname
642 if ispkg:
643 if include_packages:
644 yield modname
645 if recursive:
646 yield from find_modules(modname, include_packages, True)
647 else:
648 yield modname
651class ImportStringError(ImportError):
652 """Provides information about a failed :func:`import_string` attempt."""
654 #: String in dotted notation that failed to be imported.
655 import_name: str
656 #: Wrapped exception.
657 exception: BaseException
659 def __init__(self, import_name: str, exception: BaseException) -> None:
660 self.import_name = import_name
661 self.exception = exception
662 msg = import_name
663 name = ""
664 tracked = []
665 for part in import_name.replace(":", ".").split("."):
666 name = f"{name}.{part}" if name else part
667 imported = import_string(name, silent=True)
668 if imported:
669 tracked.append((name, getattr(imported, "__file__", None)))
670 else:
671 track = [f"- {n!r} found in {i!r}." for n, i in tracked]
672 track.append(f"- {name!r} not found.")
673 track_str = "\n".join(track)
674 msg = (
675 f"import_string() failed for {import_name!r}. Possible reasons"
676 f" are:\n\n"
677 "- missing __init__.py in a package;\n"
678 "- package or module path not included in sys.path;\n"
679 "- duplicated package or module name taking precedence in"
680 " sys.path;\n"
681 "- missing module, class, function or variable;\n\n"
682 f"Debugged import:\n\n{track_str}\n\n"
683 f"Original exception:\n\n{type(exception).__name__}: {exception}"
684 )
685 break
687 super().__init__(msg)
689 def __repr__(self) -> str:
690 return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"